blob: 1ea57ac11e72a3026b6e8b852bf1ed8a44938b81 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Georg Brandl1b37e872010-03-14 10:45:50 +000033from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000034from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000035from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000036
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000037import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000038import io # C implementation of io
39import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000040try:
41 import threading
42except ImportError:
43 threading = None
Guido van Rossum28524c72007-02-27 05:47:44 +000044
Guido van Rossuma9e20242007-03-08 00:43:48 +000045
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000046def _default_chunk_size():
47 """Get the default TextIOWrapper chunk size"""
48 with open(__file__, "r", encoding="latin1") as f:
49 return f._CHUNK_SIZE
50
51
Antoine Pitrou328ec742010-09-14 18:37:24 +000052class MockRawIOWithoutRead:
53 """A RawIO implementation without read(), so as to exercise the default
54 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000055
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000056 def __init__(self, read_stack=()):
57 self._read_stack = list(read_stack)
58 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000059 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000060 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000061
Guido van Rossum01a27522007-03-07 01:00:12 +000062 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000063 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000064 return len(b)
65
66 def writable(self):
67 return True
68
Guido van Rossum68bbcd22007-02-27 17:19:33 +000069 def fileno(self):
70 return 42
71
72 def readable(self):
73 return True
74
Guido van Rossum01a27522007-03-07 01:00:12 +000075 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000076 return True
77
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000079 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000080
81 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000082 return 0 # same comment as above
83
84 def readinto(self, buf):
85 self._reads += 1
86 max_len = len(buf)
87 try:
88 data = self._read_stack[0]
89 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000090 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000091 return 0
92 if data is None:
93 del self._read_stack[0]
94 return None
95 n = len(data)
96 if len(data) <= max_len:
97 del self._read_stack[0]
98 buf[:n] = data
99 return n
100 else:
101 buf[:] = data[:max_len]
102 self._read_stack[0] = data[max_len:]
103 return max_len
104
105 def truncate(self, pos=None):
106 return pos
107
Antoine Pitrou328ec742010-09-14 18:37:24 +0000108class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
109 pass
110
111class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
112 pass
113
114
115class MockRawIO(MockRawIOWithoutRead):
116
117 def read(self, n=None):
118 self._reads += 1
119 try:
120 return self._read_stack.pop(0)
121 except:
122 self._extraneous_reads += 1
123 return b""
124
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000125class CMockRawIO(MockRawIO, io.RawIOBase):
126 pass
127
128class PyMockRawIO(MockRawIO, pyio.RawIOBase):
129 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000130
Guido van Rossuma9e20242007-03-08 00:43:48 +0000131
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000132class MisbehavedRawIO(MockRawIO):
133 def write(self, b):
134 return super().write(b) * 2
135
136 def read(self, n=None):
137 return super().read(n) * 2
138
139 def seek(self, pos, whence):
140 return -123
141
142 def tell(self):
143 return -456
144
145 def readinto(self, buf):
146 super().readinto(buf)
147 return len(buf) * 5
148
149class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
150 pass
151
152class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
153 pass
154
155
156class CloseFailureIO(MockRawIO):
157 closed = 0
158
159 def close(self):
160 if not self.closed:
161 self.closed = 1
162 raise IOError
163
164class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
165 pass
166
167class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
168 pass
169
170
171class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000172
173 def __init__(self, data):
174 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000175 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000176
177 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000178 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000179 self.read_history.append(None if res is None else len(res))
180 return res
181
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 def readinto(self, b):
183 res = super().readinto(b)
184 self.read_history.append(res)
185 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000186
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000187class CMockFileIO(MockFileIO, io.BytesIO):
188 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000189
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000190class PyMockFileIO(MockFileIO, pyio.BytesIO):
191 pass
192
193
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000194class MockUnseekableIO:
195 def seekable(self):
196 return False
197
198 def seek(self, *args):
199 raise self.UnsupportedOperation("not seekable")
200
201 def tell(self, *args):
202 raise self.UnsupportedOperation("not seekable")
203
204class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
205 UnsupportedOperation = io.UnsupportedOperation
206
207class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
208 UnsupportedOperation = pyio.UnsupportedOperation
209
210
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000211class MockNonBlockWriterIO:
212
213 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000214 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000215 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217 def pop_written(self):
218 s = b"".join(self._write_stack)
219 self._write_stack[:] = []
220 return s
221
222 def block_on(self, char):
223 """Block when a given char is encountered."""
224 self._blocker_char = char
225
226 def readable(self):
227 return True
228
229 def seekable(self):
230 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000231
Guido van Rossum01a27522007-03-07 01:00:12 +0000232 def writable(self):
233 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000234
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000235 def write(self, b):
236 b = bytes(b)
237 n = -1
238 if self._blocker_char:
239 try:
240 n = b.index(self._blocker_char)
241 except ValueError:
242 pass
243 else:
244 self._blocker_char = None
245 self._write_stack.append(b[:n])
246 raise self.BlockingIOError(0, "test blocking", n)
247 self._write_stack.append(b)
248 return len(b)
249
250class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
251 BlockingIOError = io.BlockingIOError
252
253class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
254 BlockingIOError = pyio.BlockingIOError
255
Guido van Rossuma9e20242007-03-08 00:43:48 +0000256
Guido van Rossum28524c72007-02-27 05:47:44 +0000257class IOTest(unittest.TestCase):
258
Neal Norwitze7789b12008-03-24 06:18:09 +0000259 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000260 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000261
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000262 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000263 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000264
Guido van Rossum28524c72007-02-27 05:47:44 +0000265 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000266 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000267 f.truncate(0)
268 self.assertEqual(f.tell(), 5)
269 f.seek(0)
270
271 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000272 self.assertEqual(f.seek(0), 0)
273 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000274 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000275 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000276 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000277 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000278 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000279 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000280 self.assertEqual(f.seek(-1, 2), 13)
281 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000282
Guido van Rossum87429772007-04-10 21:06:59 +0000283 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000284 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000285 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000286
Guido van Rossum9b76da62007-04-11 01:09:03 +0000287 def read_ops(self, f, buffered=False):
288 data = f.read(5)
289 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000290 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000291 self.assertEqual(f.readinto(data), 5)
292 self.assertEqual(data, b" worl")
293 self.assertEqual(f.readinto(data), 2)
294 self.assertEqual(len(data), 5)
295 self.assertEqual(data[:2], b"d\n")
296 self.assertEqual(f.seek(0), 0)
297 self.assertEqual(f.read(20), b"hello world\n")
298 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000299 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000300 self.assertEqual(f.seek(-6, 2), 6)
301 self.assertEqual(f.read(5), b"world")
302 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000303 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000304 self.assertEqual(f.seek(-6, 1), 5)
305 self.assertEqual(f.read(5), b" worl")
306 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000307 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000308 if buffered:
309 f.seek(0)
310 self.assertEqual(f.read(), b"hello world\n")
311 f.seek(6)
312 self.assertEqual(f.read(), b"world\n")
313 self.assertEqual(f.read(), b"")
314
Guido van Rossum34d69e52007-04-10 20:08:41 +0000315 LARGE = 2**31
316
Guido van Rossum53807da2007-04-10 19:01:47 +0000317 def large_file_ops(self, f):
318 assert f.readable()
319 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000320 self.assertEqual(f.seek(self.LARGE), self.LARGE)
321 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000322 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000323 self.assertEqual(f.tell(), self.LARGE + 3)
324 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000325 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000326 self.assertEqual(f.tell(), self.LARGE + 2)
327 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000328 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000329 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000330 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
331 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000332 self.assertEqual(f.read(2), b"x")
333
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000334 def test_invalid_operations(self):
335 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000336 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000337 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000338 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000339 self.assertRaises(exc, fp.read)
340 self.assertRaises(exc, fp.readline)
341 with self.open(support.TESTFN, "wb", buffering=0) as fp:
342 self.assertRaises(exc, fp.read)
343 self.assertRaises(exc, fp.readline)
344 with self.open(support.TESTFN, "rb", buffering=0) as fp:
345 self.assertRaises(exc, fp.write, b"blah")
346 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000347 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000348 self.assertRaises(exc, fp.write, b"blah")
349 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000350 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000351 self.assertRaises(exc, fp.write, "blah")
352 self.assertRaises(exc, fp.writelines, ["blah\n"])
353 # Non-zero seeking from current or end pos
354 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
355 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000356
Guido van Rossum28524c72007-02-27 05:47:44 +0000357 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000358 with self.open(support.TESTFN, "wb", buffering=0) as f:
359 self.assertEqual(f.readable(), False)
360 self.assertEqual(f.writable(), True)
361 self.assertEqual(f.seekable(), True)
362 self.write_ops(f)
363 with self.open(support.TESTFN, "rb", buffering=0) as f:
364 self.assertEqual(f.readable(), True)
365 self.assertEqual(f.writable(), False)
366 self.assertEqual(f.seekable(), True)
367 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000368
Guido van Rossum87429772007-04-10 21:06:59 +0000369 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000370 with self.open(support.TESTFN, "wb") as f:
371 self.assertEqual(f.readable(), False)
372 self.assertEqual(f.writable(), True)
373 self.assertEqual(f.seekable(), True)
374 self.write_ops(f)
375 with self.open(support.TESTFN, "rb") as f:
376 self.assertEqual(f.readable(), True)
377 self.assertEqual(f.writable(), False)
378 self.assertEqual(f.seekable(), True)
379 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000380
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000381 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000382 with self.open(support.TESTFN, "wb") as f:
383 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
384 with self.open(support.TESTFN, "rb") as f:
385 self.assertEqual(f.readline(), b"abc\n")
386 self.assertEqual(f.readline(10), b"def\n")
387 self.assertEqual(f.readline(2), b"xy")
388 self.assertEqual(f.readline(4), b"zzy\n")
389 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000390 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000391 self.assertRaises(TypeError, f.readline, 5.3)
392 with self.open(support.TESTFN, "r") as f:
393 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000394
Guido van Rossum28524c72007-02-27 05:47:44 +0000395 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000396 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000397 self.write_ops(f)
398 data = f.getvalue()
399 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000400 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000401 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000402
Guido van Rossum53807da2007-04-10 19:01:47 +0000403 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000404 # On Windows and Mac OSX this test comsumes large resources; It takes
405 # a long time to build the >2GB file and takes >2GB of disk space
406 # therefore the resource must be enabled to run this test.
407 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000408 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000409 print("\nTesting large file ops skipped on %s." % sys.platform,
410 file=sys.stderr)
411 print("It requires %d bytes and a long time." % self.LARGE,
412 file=sys.stderr)
413 print("Use 'regrtest.py -u largefile test_io' to run it.",
414 file=sys.stderr)
415 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000416 with self.open(support.TESTFN, "w+b", 0) as f:
417 self.large_file_ops(f)
418 with self.open(support.TESTFN, "w+b") as f:
419 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000420
421 def test_with_open(self):
422 for bufsize in (0, 1, 100):
423 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000424 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000425 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000426 self.assertEqual(f.closed, True)
427 f = None
428 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000429 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000430 1/0
431 except ZeroDivisionError:
432 self.assertEqual(f.closed, True)
433 else:
434 self.fail("1/0 didn't raise an exception")
435
Antoine Pitrou08838b62009-01-21 00:55:13 +0000436 # issue 5008
437 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000438 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000439 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000440 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000441 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000442 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000443 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000444 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000445 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000446
Guido van Rossum87429772007-04-10 21:06:59 +0000447 def test_destructor(self):
448 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000449 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000450 def __del__(self):
451 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000452 try:
453 f = super().__del__
454 except AttributeError:
455 pass
456 else:
457 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000458 def close(self):
459 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000460 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000461 def flush(self):
462 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000463 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000464 with support.check_warnings(('', ResourceWarning)):
465 f = MyFileIO(support.TESTFN, "wb")
466 f.write(b"xxx")
467 del f
468 support.gc_collect()
469 self.assertEqual(record, [1, 2, 3])
470 with self.open(support.TESTFN, "rb") as f:
471 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000472
473 def _check_base_destructor(self, base):
474 record = []
475 class MyIO(base):
476 def __init__(self):
477 # This exercises the availability of attributes on object
478 # destruction.
479 # (in the C version, close() is called by the tp_dealloc
480 # function, not by __del__)
481 self.on_del = 1
482 self.on_close = 2
483 self.on_flush = 3
484 def __del__(self):
485 record.append(self.on_del)
486 try:
487 f = super().__del__
488 except AttributeError:
489 pass
490 else:
491 f()
492 def close(self):
493 record.append(self.on_close)
494 super().close()
495 def flush(self):
496 record.append(self.on_flush)
497 super().flush()
498 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000499 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000500 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000501 self.assertEqual(record, [1, 2, 3])
502
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000503 def test_IOBase_destructor(self):
504 self._check_base_destructor(self.IOBase)
505
506 def test_RawIOBase_destructor(self):
507 self._check_base_destructor(self.RawIOBase)
508
509 def test_BufferedIOBase_destructor(self):
510 self._check_base_destructor(self.BufferedIOBase)
511
512 def test_TextIOBase_destructor(self):
513 self._check_base_destructor(self.TextIOBase)
514
Guido van Rossum87429772007-04-10 21:06:59 +0000515 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000516 with self.open(support.TESTFN, "wb") as f:
517 f.write(b"xxx")
518 with self.open(support.TESTFN, "rb") as f:
519 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000520
Guido van Rossumd4103952007-04-12 05:44:49 +0000521 def test_array_writes(self):
522 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000523 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000524 with self.open(support.TESTFN, "wb", 0) as f:
525 self.assertEqual(f.write(a), n)
526 with self.open(support.TESTFN, "wb") as f:
527 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000528
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000529 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000530 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000531 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000532
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000533 def test_read_closed(self):
534 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000535 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000536 with self.open(support.TESTFN, "r") as f:
537 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000538 self.assertEqual(file.read(), "egg\n")
539 file.seek(0)
540 file.close()
541 self.assertRaises(ValueError, file.read)
542
543 def test_no_closefd_with_filename(self):
544 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000545 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000546
547 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000548 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000549 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000550 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000551 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000552 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000553 self.assertEqual(file.buffer.raw.closefd, False)
554
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000555 def test_garbage_collection(self):
556 # FileIO objects are collected, and collecting them flushes
557 # all data to disk.
558 f = self.FileIO(support.TESTFN, "wb")
559 f.write(b"abcxxx")
560 f.f = f
561 wr = weakref.ref(f)
562 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000563 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000564 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000565 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000567
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000568 def test_unbounded_file(self):
569 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
570 zero = "/dev/zero"
571 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000572 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000573 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000574 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000575 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000576 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000577 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000578 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000579 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000581 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000582 self.assertRaises(OverflowError, f.read)
583
Antoine Pitrou6be88762010-05-03 16:48:20 +0000584 def test_flush_error_on_close(self):
585 f = self.open(support.TESTFN, "wb", buffering=0)
586 def bad_flush():
587 raise IOError()
588 f.flush = bad_flush
589 self.assertRaises(IOError, f.close) # exception not swallowed
590
591 def test_multi_close(self):
592 f = self.open(support.TESTFN, "wb", buffering=0)
593 f.close()
594 f.close()
595 f.close()
596 self.assertRaises(ValueError, f.flush)
597
Antoine Pitrou328ec742010-09-14 18:37:24 +0000598 def test_RawIOBase_read(self):
599 # Exercise the default RawIOBase.read() implementation (which calls
600 # readinto() internally).
601 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
602 self.assertEqual(rawio.read(2), b"ab")
603 self.assertEqual(rawio.read(2), b"c")
604 self.assertEqual(rawio.read(2), b"d")
605 self.assertEqual(rawio.read(2), None)
606 self.assertEqual(rawio.read(2), b"ef")
607 self.assertEqual(rawio.read(2), b"g")
608 self.assertEqual(rawio.read(2), None)
609 self.assertEqual(rawio.read(2), b"")
610
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000611class CIOTest(IOTest):
612 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000613
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000614class PyIOTest(IOTest):
615 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000616
Guido van Rossuma9e20242007-03-08 00:43:48 +0000617
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000618class CommonBufferedTests:
619 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
620
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000621 def test_detach(self):
622 raw = self.MockRawIO()
623 buf = self.tp(raw)
624 self.assertIs(buf.detach(), raw)
625 self.assertRaises(ValueError, buf.detach)
626
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000627 def test_fileno(self):
628 rawio = self.MockRawIO()
629 bufio = self.tp(rawio)
630
631 self.assertEquals(42, bufio.fileno())
632
633 def test_no_fileno(self):
634 # XXX will we always have fileno() function? If so, kill
635 # this test. Else, write it.
636 pass
637
638 def test_invalid_args(self):
639 rawio = self.MockRawIO()
640 bufio = self.tp(rawio)
641 # Invalid whence
642 self.assertRaises(ValueError, bufio.seek, 0, -1)
643 self.assertRaises(ValueError, bufio.seek, 0, 3)
644
645 def test_override_destructor(self):
646 tp = self.tp
647 record = []
648 class MyBufferedIO(tp):
649 def __del__(self):
650 record.append(1)
651 try:
652 f = super().__del__
653 except AttributeError:
654 pass
655 else:
656 f()
657 def close(self):
658 record.append(2)
659 super().close()
660 def flush(self):
661 record.append(3)
662 super().flush()
663 rawio = self.MockRawIO()
664 bufio = MyBufferedIO(rawio)
665 writable = bufio.writable()
666 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000667 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000668 if writable:
669 self.assertEqual(record, [1, 2, 3])
670 else:
671 self.assertEqual(record, [1, 2])
672
673 def test_context_manager(self):
674 # Test usability as a context manager
675 rawio = self.MockRawIO()
676 bufio = self.tp(rawio)
677 def _with():
678 with bufio:
679 pass
680 _with()
681 # bufio should now be closed, and using it a second time should raise
682 # a ValueError.
683 self.assertRaises(ValueError, _with)
684
685 def test_error_through_destructor(self):
686 # Test that the exception state is not modified by a destructor,
687 # even if close() fails.
688 rawio = self.CloseFailureIO()
689 def f():
690 self.tp(rawio).xyzzy
691 with support.captured_output("stderr") as s:
692 self.assertRaises(AttributeError, f)
693 s = s.getvalue().strip()
694 if s:
695 # The destructor *may* have printed an unraisable error, check it
696 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000697 self.assertTrue(s.startswith("Exception IOError: "), s)
698 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000699
Antoine Pitrou716c4442009-05-23 19:04:03 +0000700 def test_repr(self):
701 raw = self.MockRawIO()
702 b = self.tp(raw)
703 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
704 self.assertEqual(repr(b), "<%s>" % clsname)
705 raw.name = "dummy"
706 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
707 raw.name = b"dummy"
708 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
709
Antoine Pitrou6be88762010-05-03 16:48:20 +0000710 def test_flush_error_on_close(self):
711 raw = self.MockRawIO()
712 def bad_flush():
713 raise IOError()
714 raw.flush = bad_flush
715 b = self.tp(raw)
716 self.assertRaises(IOError, b.close) # exception not swallowed
717
718 def test_multi_close(self):
719 raw = self.MockRawIO()
720 b = self.tp(raw)
721 b.close()
722 b.close()
723 b.close()
724 self.assertRaises(ValueError, b.flush)
725
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000726 def test_unseekable(self):
727 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
728 self.assertRaises(self.UnsupportedOperation, bufio.tell)
729 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
730
Guido van Rossum78892e42007-04-06 17:31:18 +0000731
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000732class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
733 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000734
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000735 def test_constructor(self):
736 rawio = self.MockRawIO([b"abc"])
737 bufio = self.tp(rawio)
738 bufio.__init__(rawio)
739 bufio.__init__(rawio, buffer_size=1024)
740 bufio.__init__(rawio, buffer_size=16)
741 self.assertEquals(b"abc", bufio.read())
742 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
743 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
744 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
745 rawio = self.MockRawIO([b"abc"])
746 bufio.__init__(rawio)
747 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000748
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000749 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000750 for arg in (None, 7):
751 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
752 bufio = self.tp(rawio)
753 self.assertEquals(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000754 # Invalid args
755 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000756
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000757 def test_read1(self):
758 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
759 bufio = self.tp(rawio)
760 self.assertEquals(b"a", bufio.read(1))
761 self.assertEquals(b"b", bufio.read1(1))
762 self.assertEquals(rawio._reads, 1)
763 self.assertEquals(b"c", bufio.read1(100))
764 self.assertEquals(rawio._reads, 1)
765 self.assertEquals(b"d", bufio.read1(100))
766 self.assertEquals(rawio._reads, 2)
767 self.assertEquals(b"efg", bufio.read1(100))
768 self.assertEquals(rawio._reads, 3)
769 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000770 self.assertEquals(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000771 # Invalid args
772 self.assertRaises(ValueError, bufio.read1, -1)
773
774 def test_readinto(self):
775 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
776 bufio = self.tp(rawio)
777 b = bytearray(2)
778 self.assertEquals(bufio.readinto(b), 2)
779 self.assertEquals(b, b"ab")
780 self.assertEquals(bufio.readinto(b), 2)
781 self.assertEquals(b, b"cd")
782 self.assertEquals(bufio.readinto(b), 2)
783 self.assertEquals(b, b"ef")
784 self.assertEquals(bufio.readinto(b), 1)
785 self.assertEquals(b, b"gf")
786 self.assertEquals(bufio.readinto(b), 0)
787 self.assertEquals(b, b"gf")
788
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000789 def test_readlines(self):
790 def bufio():
791 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
792 return self.tp(rawio)
793 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
794 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
795 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
796
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000797 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000798 data = b"abcdefghi"
799 dlen = len(data)
800
801 tests = [
802 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
803 [ 100, [ 3, 3, 3], [ dlen ] ],
804 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
805 ]
806
807 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000808 rawio = self.MockFileIO(data)
809 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000810 pos = 0
811 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000812 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000813 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000814 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000815 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000816
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000817 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000818 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000819 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
820 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000821
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000822 self.assertEquals(b"abcd", bufio.read(6))
823 self.assertEquals(b"e", bufio.read(1))
824 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000825 self.assertEquals(b"", bufio.peek(1))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000826 self.assertTrue(None is bufio.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000827 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000828
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000829 def test_read_past_eof(self):
830 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
831 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000832
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000833 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000834
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000835 def test_read_all(self):
836 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
837 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000838
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000839 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000840
Victor Stinner45df8202010-04-28 22:31:17 +0000841 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000842 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000843 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000844 try:
845 # Write out many bytes with exactly the same number of 0's,
846 # 1's... 255's. This will help us check that concurrent reading
847 # doesn't duplicate or forget contents.
848 N = 1000
849 l = list(range(256)) * N
850 random.shuffle(l)
851 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000852 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000853 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000854 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000855 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000856 errors = []
857 results = []
858 def f():
859 try:
860 # Intra-buffer read then buffer-flushing read
861 for n in cycle([1, 19]):
862 s = bufio.read(n)
863 if not s:
864 break
865 # list.append() is atomic
866 results.append(s)
867 except Exception as e:
868 errors.append(e)
869 raise
870 threads = [threading.Thread(target=f) for x in range(20)]
871 for t in threads:
872 t.start()
873 time.sleep(0.02) # yield
874 for t in threads:
875 t.join()
876 self.assertFalse(errors,
877 "the following exceptions were caught: %r" % errors)
878 s = b''.join(results)
879 for i in range(256):
880 c = bytes(bytearray([i]))
881 self.assertEqual(s.count(c), N)
882 finally:
883 support.unlink(support.TESTFN)
884
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000885 def test_misbehaved_io(self):
886 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
887 bufio = self.tp(rawio)
888 self.assertRaises(IOError, bufio.seek, 0)
889 self.assertRaises(IOError, bufio.tell)
890
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000891 def test_no_extraneous_read(self):
892 # Issue #9550; when the raw IO object has satisfied the read request,
893 # we should not issue any additional reads, otherwise it may block
894 # (e.g. socket).
895 bufsize = 16
896 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
897 rawio = self.MockRawIO([b"x" * n])
898 bufio = self.tp(rawio, bufsize)
899 self.assertEqual(bufio.read(n), b"x" * n)
900 # Simple case: one raw read is enough to satisfy the request.
901 self.assertEqual(rawio._extraneous_reads, 0,
902 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
903 # A more complex case where two raw reads are needed to satisfy
904 # the request.
905 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
906 bufio = self.tp(rawio, bufsize)
907 self.assertEqual(bufio.read(n), b"x" * n)
908 self.assertEqual(rawio._extraneous_reads, 0,
909 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
910
911
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000912class CBufferedReaderTest(BufferedReaderTest):
913 tp = io.BufferedReader
914
915 def test_constructor(self):
916 BufferedReaderTest.test_constructor(self)
917 # The allocation can succeed on 32-bit builds, e.g. with more
918 # than 2GB RAM and a 64-bit kernel.
919 if sys.maxsize > 0x7FFFFFFF:
920 rawio = self.MockRawIO()
921 bufio = self.tp(rawio)
922 self.assertRaises((OverflowError, MemoryError, ValueError),
923 bufio.__init__, rawio, sys.maxsize)
924
925 def test_initialization(self):
926 rawio = self.MockRawIO([b"abc"])
927 bufio = self.tp(rawio)
928 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
929 self.assertRaises(ValueError, bufio.read)
930 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
931 self.assertRaises(ValueError, bufio.read)
932 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
933 self.assertRaises(ValueError, bufio.read)
934
935 def test_misbehaved_io_read(self):
936 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
937 bufio = self.tp(rawio)
938 # _pyio.BufferedReader seems to implement reading different, so that
939 # checking this is not so easy.
940 self.assertRaises(IOError, bufio.read, 10)
941
942 def test_garbage_collection(self):
943 # C BufferedReader objects are collected.
944 # The Python version has __del__, so it ends into gc.garbage instead
945 rawio = self.FileIO(support.TESTFN, "w+b")
946 f = self.tp(rawio)
947 f.f = f
948 wr = weakref.ref(f)
949 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000950 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000951 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000952
953class PyBufferedReaderTest(BufferedReaderTest):
954 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000955
Guido van Rossuma9e20242007-03-08 00:43:48 +0000956
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000957class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
958 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000959
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000960 def test_constructor(self):
961 rawio = self.MockRawIO()
962 bufio = self.tp(rawio)
963 bufio.__init__(rawio)
964 bufio.__init__(rawio, buffer_size=1024)
965 bufio.__init__(rawio, buffer_size=16)
966 self.assertEquals(3, bufio.write(b"abc"))
967 bufio.flush()
968 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
969 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
970 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
971 bufio.__init__(rawio)
972 self.assertEquals(3, bufio.write(b"ghi"))
973 bufio.flush()
974 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
975
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000976 def test_detach_flush(self):
977 raw = self.MockRawIO()
978 buf = self.tp(raw)
979 buf.write(b"howdy!")
980 self.assertFalse(raw._write_stack)
981 buf.detach()
982 self.assertEqual(raw._write_stack, [b"howdy!"])
983
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000984 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000985 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000986 writer = self.MockRawIO()
987 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000988 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000989 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000990
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000991 def test_write_overflow(self):
992 writer = self.MockRawIO()
993 bufio = self.tp(writer, 8)
994 contents = b"abcdefghijklmnop"
995 for n in range(0, len(contents), 3):
996 bufio.write(contents[n:n+3])
997 flushed = b"".join(writer._write_stack)
998 # At least (total - 8) bytes were implicitly flushed, perhaps more
999 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001000 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001001
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001002 def check_writes(self, intermediate_func):
1003 # Lots of writes, test the flushed output is as expected.
1004 contents = bytes(range(256)) * 1000
1005 n = 0
1006 writer = self.MockRawIO()
1007 bufio = self.tp(writer, 13)
1008 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1009 def gen_sizes():
1010 for size in count(1):
1011 for i in range(15):
1012 yield size
1013 sizes = gen_sizes()
1014 while n < len(contents):
1015 size = min(next(sizes), len(contents) - n)
1016 self.assertEquals(bufio.write(contents[n:n+size]), size)
1017 intermediate_func(bufio)
1018 n += size
1019 bufio.flush()
1020 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001021
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001022 def test_writes(self):
1023 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001024
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001025 def test_writes_and_flushes(self):
1026 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001027
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001028 def test_writes_and_seeks(self):
1029 def _seekabs(bufio):
1030 pos = bufio.tell()
1031 bufio.seek(pos + 1, 0)
1032 bufio.seek(pos - 1, 0)
1033 bufio.seek(pos, 0)
1034 self.check_writes(_seekabs)
1035 def _seekrel(bufio):
1036 pos = bufio.seek(0, 1)
1037 bufio.seek(+1, 1)
1038 bufio.seek(-1, 1)
1039 bufio.seek(pos, 0)
1040 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001041
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001042 def test_writes_and_truncates(self):
1043 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001044
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001045 def test_write_non_blocking(self):
1046 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001047 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001048
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001049 self.assertEquals(bufio.write(b"abcd"), 4)
1050 self.assertEquals(bufio.write(b"efghi"), 5)
1051 # 1 byte will be written, the rest will be buffered
1052 raw.block_on(b"k")
1053 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001054
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001055 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1056 raw.block_on(b"0")
1057 try:
1058 bufio.write(b"opqrwxyz0123456789")
1059 except self.BlockingIOError as e:
1060 written = e.characters_written
1061 else:
1062 self.fail("BlockingIOError should have been raised")
1063 self.assertEquals(written, 16)
1064 self.assertEquals(raw.pop_written(),
1065 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001066
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001067 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
1068 s = raw.pop_written()
1069 # Previously buffered bytes were flushed
1070 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001071
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001072 def test_write_and_rewind(self):
1073 raw = io.BytesIO()
1074 bufio = self.tp(raw, 4)
1075 self.assertEqual(bufio.write(b"abcdef"), 6)
1076 self.assertEqual(bufio.tell(), 6)
1077 bufio.seek(0, 0)
1078 self.assertEqual(bufio.write(b"XY"), 2)
1079 bufio.seek(6, 0)
1080 self.assertEqual(raw.getvalue(), b"XYcdef")
1081 self.assertEqual(bufio.write(b"123456"), 6)
1082 bufio.flush()
1083 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001084
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001085 def test_flush(self):
1086 writer = self.MockRawIO()
1087 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001088 bufio.write(b"abc")
1089 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001090 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001091
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001092 def test_destructor(self):
1093 writer = self.MockRawIO()
1094 bufio = self.tp(writer, 8)
1095 bufio.write(b"abc")
1096 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001097 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001098 self.assertEquals(b"abc", writer._write_stack[0])
1099
1100 def test_truncate(self):
1101 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001102 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001103 bufio = self.tp(raw, 8)
1104 bufio.write(b"abcdef")
1105 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001106 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001107 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001108 self.assertEqual(f.read(), b"abc")
1109
Victor Stinner45df8202010-04-28 22:31:17 +00001110 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001111 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001112 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001113 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001114 # Write out many bytes from many threads and test they were
1115 # all flushed.
1116 N = 1000
1117 contents = bytes(range(256)) * N
1118 sizes = cycle([1, 19])
1119 n = 0
1120 queue = deque()
1121 while n < len(contents):
1122 size = next(sizes)
1123 queue.append(contents[n:n+size])
1124 n += size
1125 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001126 # We use a real file object because it allows us to
1127 # exercise situations where the GIL is released before
1128 # writing the buffer to the raw streams. This is in addition
1129 # to concurrency issues due to switching threads in the middle
1130 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001131 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001132 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001133 errors = []
1134 def f():
1135 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001136 while True:
1137 try:
1138 s = queue.popleft()
1139 except IndexError:
1140 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001141 bufio.write(s)
1142 except Exception as e:
1143 errors.append(e)
1144 raise
1145 threads = [threading.Thread(target=f) for x in range(20)]
1146 for t in threads:
1147 t.start()
1148 time.sleep(0.02) # yield
1149 for t in threads:
1150 t.join()
1151 self.assertFalse(errors,
1152 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001153 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001154 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001155 s = f.read()
1156 for i in range(256):
1157 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001158 finally:
1159 support.unlink(support.TESTFN)
1160
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001161 def test_misbehaved_io(self):
1162 rawio = self.MisbehavedRawIO()
1163 bufio = self.tp(rawio, 5)
1164 self.assertRaises(IOError, bufio.seek, 0)
1165 self.assertRaises(IOError, bufio.tell)
1166 self.assertRaises(IOError, bufio.write, b"abcdef")
1167
Benjamin Peterson59406a92009-03-26 17:10:29 +00001168 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001169 with support.check_warnings(("max_buffer_size is deprecated",
1170 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001171 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001172
1173
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001174class CBufferedWriterTest(BufferedWriterTest):
1175 tp = io.BufferedWriter
1176
1177 def test_constructor(self):
1178 BufferedWriterTest.test_constructor(self)
1179 # The allocation can succeed on 32-bit builds, e.g. with more
1180 # than 2GB RAM and a 64-bit kernel.
1181 if sys.maxsize > 0x7FFFFFFF:
1182 rawio = self.MockRawIO()
1183 bufio = self.tp(rawio)
1184 self.assertRaises((OverflowError, MemoryError, ValueError),
1185 bufio.__init__, rawio, sys.maxsize)
1186
1187 def test_initialization(self):
1188 rawio = self.MockRawIO()
1189 bufio = self.tp(rawio)
1190 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1191 self.assertRaises(ValueError, bufio.write, b"def")
1192 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1193 self.assertRaises(ValueError, bufio.write, b"def")
1194 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1195 self.assertRaises(ValueError, bufio.write, b"def")
1196
1197 def test_garbage_collection(self):
1198 # C BufferedWriter objects are collected, and collecting them flushes
1199 # all data to disk.
1200 # The Python version has __del__, so it ends into gc.garbage instead
1201 rawio = self.FileIO(support.TESTFN, "w+b")
1202 f = self.tp(rawio)
1203 f.write(b"123xxx")
1204 f.x = f
1205 wr = weakref.ref(f)
1206 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001207 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001208 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001209 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001210 self.assertEqual(f.read(), b"123xxx")
1211
1212
1213class PyBufferedWriterTest(BufferedWriterTest):
1214 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001215
Guido van Rossum01a27522007-03-07 01:00:12 +00001216class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001217
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001218 def test_constructor(self):
1219 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001220 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001221
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001222 def test_detach(self):
1223 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1224 self.assertRaises(self.UnsupportedOperation, pair.detach)
1225
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001226 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001227 with support.check_warnings(("max_buffer_size is deprecated",
1228 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001229 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001230
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001231 def test_constructor_with_not_readable(self):
1232 class NotReadable(MockRawIO):
1233 def readable(self):
1234 return False
1235
1236 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1237
1238 def test_constructor_with_not_writeable(self):
1239 class NotWriteable(MockRawIO):
1240 def writable(self):
1241 return False
1242
1243 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1244
1245 def test_read(self):
1246 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1247
1248 self.assertEqual(pair.read(3), b"abc")
1249 self.assertEqual(pair.read(1), b"d")
1250 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001251 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1252 self.assertEqual(pair.read(None), b"abc")
1253
1254 def test_readlines(self):
1255 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1256 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1257 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1258 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001259
1260 def test_read1(self):
1261 # .read1() is delegated to the underlying reader object, so this test
1262 # can be shallow.
1263 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1264
1265 self.assertEqual(pair.read1(3), b"abc")
1266
1267 def test_readinto(self):
1268 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1269
1270 data = bytearray(5)
1271 self.assertEqual(pair.readinto(data), 5)
1272 self.assertEqual(data, b"abcde")
1273
1274 def test_write(self):
1275 w = self.MockRawIO()
1276 pair = self.tp(self.MockRawIO(), w)
1277
1278 pair.write(b"abc")
1279 pair.flush()
1280 pair.write(b"def")
1281 pair.flush()
1282 self.assertEqual(w._write_stack, [b"abc", b"def"])
1283
1284 def test_peek(self):
1285 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1286
1287 self.assertTrue(pair.peek(3).startswith(b"abc"))
1288 self.assertEqual(pair.read(3), b"abc")
1289
1290 def test_readable(self):
1291 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1292 self.assertTrue(pair.readable())
1293
1294 def test_writeable(self):
1295 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1296 self.assertTrue(pair.writable())
1297
1298 def test_seekable(self):
1299 # BufferedRWPairs are never seekable, even if their readers and writers
1300 # are.
1301 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1302 self.assertFalse(pair.seekable())
1303
1304 # .flush() is delegated to the underlying writer object and has been
1305 # tested in the test_write method.
1306
1307 def test_close_and_closed(self):
1308 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1309 self.assertFalse(pair.closed)
1310 pair.close()
1311 self.assertTrue(pair.closed)
1312
1313 def test_isatty(self):
1314 class SelectableIsAtty(MockRawIO):
1315 def __init__(self, isatty):
1316 MockRawIO.__init__(self)
1317 self._isatty = isatty
1318
1319 def isatty(self):
1320 return self._isatty
1321
1322 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1323 self.assertFalse(pair.isatty())
1324
1325 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1326 self.assertTrue(pair.isatty())
1327
1328 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1329 self.assertTrue(pair.isatty())
1330
1331 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1332 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001333
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001334class CBufferedRWPairTest(BufferedRWPairTest):
1335 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001336
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001337class PyBufferedRWPairTest(BufferedRWPairTest):
1338 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001339
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001340
1341class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1342 read_mode = "rb+"
1343 write_mode = "wb+"
1344
1345 def test_constructor(self):
1346 BufferedReaderTest.test_constructor(self)
1347 BufferedWriterTest.test_constructor(self)
1348
1349 def test_read_and_write(self):
1350 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001351 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001352
1353 self.assertEqual(b"as", rw.read(2))
1354 rw.write(b"ddd")
1355 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001356 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001357 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001358 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001359
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001360 def test_seek_and_tell(self):
1361 raw = self.BytesIO(b"asdfghjkl")
1362 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001363
1364 self.assertEquals(b"as", rw.read(2))
1365 self.assertEquals(2, rw.tell())
1366 rw.seek(0, 0)
1367 self.assertEquals(b"asdf", rw.read(4))
1368
1369 rw.write(b"asdf")
1370 rw.seek(0, 0)
1371 self.assertEquals(b"asdfasdfl", rw.read())
1372 self.assertEquals(9, rw.tell())
1373 rw.seek(-4, 2)
1374 self.assertEquals(5, rw.tell())
1375 rw.seek(2, 1)
1376 self.assertEquals(7, rw.tell())
1377 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001378 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001379
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001380 def check_flush_and_read(self, read_func):
1381 raw = self.BytesIO(b"abcdefghi")
1382 bufio = self.tp(raw)
1383
1384 self.assertEquals(b"ab", read_func(bufio, 2))
1385 bufio.write(b"12")
1386 self.assertEquals(b"ef", read_func(bufio, 2))
1387 self.assertEquals(6, bufio.tell())
1388 bufio.flush()
1389 self.assertEquals(6, bufio.tell())
1390 self.assertEquals(b"ghi", read_func(bufio))
1391 raw.seek(0, 0)
1392 raw.write(b"XYZ")
1393 # flush() resets the read buffer
1394 bufio.flush()
1395 bufio.seek(0, 0)
1396 self.assertEquals(b"XYZ", read_func(bufio, 3))
1397
1398 def test_flush_and_read(self):
1399 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1400
1401 def test_flush_and_readinto(self):
1402 def _readinto(bufio, n=-1):
1403 b = bytearray(n if n >= 0 else 9999)
1404 n = bufio.readinto(b)
1405 return bytes(b[:n])
1406 self.check_flush_and_read(_readinto)
1407
1408 def test_flush_and_peek(self):
1409 def _peek(bufio, n=-1):
1410 # This relies on the fact that the buffer can contain the whole
1411 # raw stream, otherwise peek() can return less.
1412 b = bufio.peek(n)
1413 if n != -1:
1414 b = b[:n]
1415 bufio.seek(len(b), 1)
1416 return b
1417 self.check_flush_and_read(_peek)
1418
1419 def test_flush_and_write(self):
1420 raw = self.BytesIO(b"abcdefghi")
1421 bufio = self.tp(raw)
1422
1423 bufio.write(b"123")
1424 bufio.flush()
1425 bufio.write(b"45")
1426 bufio.flush()
1427 bufio.seek(0, 0)
1428 self.assertEquals(b"12345fghi", raw.getvalue())
1429 self.assertEquals(b"12345fghi", bufio.read())
1430
1431 def test_threads(self):
1432 BufferedReaderTest.test_threads(self)
1433 BufferedWriterTest.test_threads(self)
1434
1435 def test_writes_and_peek(self):
1436 def _peek(bufio):
1437 bufio.peek(1)
1438 self.check_writes(_peek)
1439 def _peek(bufio):
1440 pos = bufio.tell()
1441 bufio.seek(-1, 1)
1442 bufio.peek(1)
1443 bufio.seek(pos, 0)
1444 self.check_writes(_peek)
1445
1446 def test_writes_and_reads(self):
1447 def _read(bufio):
1448 bufio.seek(-1, 1)
1449 bufio.read(1)
1450 self.check_writes(_read)
1451
1452 def test_writes_and_read1s(self):
1453 def _read1(bufio):
1454 bufio.seek(-1, 1)
1455 bufio.read1(1)
1456 self.check_writes(_read1)
1457
1458 def test_writes_and_readintos(self):
1459 def _read(bufio):
1460 bufio.seek(-1, 1)
1461 bufio.readinto(bytearray(1))
1462 self.check_writes(_read)
1463
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001464 def test_write_after_readahead(self):
1465 # Issue #6629: writing after the buffer was filled by readahead should
1466 # first rewind the raw stream.
1467 for overwrite_size in [1, 5]:
1468 raw = self.BytesIO(b"A" * 10)
1469 bufio = self.tp(raw, 4)
1470 # Trigger readahead
1471 self.assertEqual(bufio.read(1), b"A")
1472 self.assertEqual(bufio.tell(), 1)
1473 # Overwriting should rewind the raw stream if it needs so
1474 bufio.write(b"B" * overwrite_size)
1475 self.assertEqual(bufio.tell(), overwrite_size + 1)
1476 # If the write size was smaller than the buffer size, flush() and
1477 # check that rewind happens.
1478 bufio.flush()
1479 self.assertEqual(bufio.tell(), overwrite_size + 1)
1480 s = raw.getvalue()
1481 self.assertEqual(s,
1482 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1483
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001484 def test_truncate_after_read_or_write(self):
1485 raw = self.BytesIO(b"A" * 10)
1486 bufio = self.tp(raw, 100)
1487 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1488 self.assertEqual(bufio.truncate(), 2)
1489 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1490 self.assertEqual(bufio.truncate(), 4)
1491
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001492 def test_misbehaved_io(self):
1493 BufferedReaderTest.test_misbehaved_io(self)
1494 BufferedWriterTest.test_misbehaved_io(self)
1495
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001496 # You can't construct a BufferedRandom over a non-seekable stream.
1497 test_unseekable = None
1498
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001499class CBufferedRandomTest(BufferedRandomTest):
1500 tp = io.BufferedRandom
1501
1502 def test_constructor(self):
1503 BufferedRandomTest.test_constructor(self)
1504 # The allocation can succeed on 32-bit builds, e.g. with more
1505 # than 2GB RAM and a 64-bit kernel.
1506 if sys.maxsize > 0x7FFFFFFF:
1507 rawio = self.MockRawIO()
1508 bufio = self.tp(rawio)
1509 self.assertRaises((OverflowError, MemoryError, ValueError),
1510 bufio.__init__, rawio, sys.maxsize)
1511
1512 def test_garbage_collection(self):
1513 CBufferedReaderTest.test_garbage_collection(self)
1514 CBufferedWriterTest.test_garbage_collection(self)
1515
1516class PyBufferedRandomTest(BufferedRandomTest):
1517 tp = pyio.BufferedRandom
1518
1519
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001520# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1521# properties:
1522# - A single output character can correspond to many bytes of input.
1523# - The number of input bytes to complete the character can be
1524# undetermined until the last input byte is received.
1525# - The number of input bytes can vary depending on previous input.
1526# - A single input byte can correspond to many characters of output.
1527# - The number of output characters can be undetermined until the
1528# last input byte is received.
1529# - The number of output characters can vary depending on previous input.
1530
1531class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1532 """
1533 For testing seek/tell behavior with a stateful, buffering decoder.
1534
1535 Input is a sequence of words. Words may be fixed-length (length set
1536 by input) or variable-length (period-terminated). In variable-length
1537 mode, extra periods are ignored. Possible words are:
1538 - 'i' followed by a number sets the input length, I (maximum 99).
1539 When I is set to 0, words are space-terminated.
1540 - 'o' followed by a number sets the output length, O (maximum 99).
1541 - Any other word is converted into a word followed by a period on
1542 the output. The output word consists of the input word truncated
1543 or padded out with hyphens to make its length equal to O. If O
1544 is 0, the word is output verbatim without truncating or padding.
1545 I and O are initially set to 1. When I changes, any buffered input is
1546 re-scanned according to the new I. EOF also terminates the last word.
1547 """
1548
1549 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001550 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001551 self.reset()
1552
1553 def __repr__(self):
1554 return '<SID %x>' % id(self)
1555
1556 def reset(self):
1557 self.i = 1
1558 self.o = 1
1559 self.buffer = bytearray()
1560
1561 def getstate(self):
1562 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1563 return bytes(self.buffer), i*100 + o
1564
1565 def setstate(self, state):
1566 buffer, io = state
1567 self.buffer = bytearray(buffer)
1568 i, o = divmod(io, 100)
1569 self.i, self.o = i ^ 1, o ^ 1
1570
1571 def decode(self, input, final=False):
1572 output = ''
1573 for b in input:
1574 if self.i == 0: # variable-length, terminated with period
1575 if b == ord('.'):
1576 if self.buffer:
1577 output += self.process_word()
1578 else:
1579 self.buffer.append(b)
1580 else: # fixed-length, terminate after self.i bytes
1581 self.buffer.append(b)
1582 if len(self.buffer) == self.i:
1583 output += self.process_word()
1584 if final and self.buffer: # EOF terminates the last word
1585 output += self.process_word()
1586 return output
1587
1588 def process_word(self):
1589 output = ''
1590 if self.buffer[0] == ord('i'):
1591 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1592 elif self.buffer[0] == ord('o'):
1593 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1594 else:
1595 output = self.buffer.decode('ascii')
1596 if len(output) < self.o:
1597 output += '-'*self.o # pad out with hyphens
1598 if self.o:
1599 output = output[:self.o] # truncate to output length
1600 output += '.'
1601 self.buffer = bytearray()
1602 return output
1603
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001604 codecEnabled = False
1605
1606 @classmethod
1607 def lookupTestDecoder(cls, name):
1608 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001609 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001610 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001611 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001612 incrementalencoder=None,
1613 streamreader=None, streamwriter=None,
1614 incrementaldecoder=cls)
1615
1616# Register the previous decoder for testing.
1617# Disabled by default, tests will enable it.
1618codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1619
1620
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001621class StatefulIncrementalDecoderTest(unittest.TestCase):
1622 """
1623 Make sure the StatefulIncrementalDecoder actually works.
1624 """
1625
1626 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001627 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001628 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001629 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001630 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001631 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001632 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001633 # I=0, O=6 (variable-length input, fixed-length output)
1634 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1635 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001636 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001637 # I=6, O=3 (fixed-length input > fixed-length output)
1638 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1639 # I=0, then 3; O=29, then 15 (with longer output)
1640 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1641 'a----------------------------.' +
1642 'b----------------------------.' +
1643 'cde--------------------------.' +
1644 'abcdefghijabcde.' +
1645 'a.b------------.' +
1646 '.c.------------.' +
1647 'd.e------------.' +
1648 'k--------------.' +
1649 'l--------------.' +
1650 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001651 ]
1652
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001653 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001654 # Try a few one-shot test cases.
1655 for input, eof, output in self.test_cases:
1656 d = StatefulIncrementalDecoder()
1657 self.assertEquals(d.decode(input, eof), output)
1658
1659 # Also test an unfinished decode, followed by forcing EOF.
1660 d = StatefulIncrementalDecoder()
1661 self.assertEquals(d.decode(b'oiabcd'), '')
1662 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001663
1664class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001665
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001666 def setUp(self):
1667 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1668 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001669 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001670
Guido van Rossumd0712812007-04-11 16:32:43 +00001671 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001672 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001673
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001674 def test_constructor(self):
1675 r = self.BytesIO(b"\xc3\xa9\n\n")
1676 b = self.BufferedReader(r, 1000)
1677 t = self.TextIOWrapper(b)
1678 t.__init__(b, encoding="latin1", newline="\r\n")
1679 self.assertEquals(t.encoding, "latin1")
1680 self.assertEquals(t.line_buffering, False)
1681 t.__init__(b, encoding="utf8", line_buffering=True)
1682 self.assertEquals(t.encoding, "utf8")
1683 self.assertEquals(t.line_buffering, True)
1684 self.assertEquals("\xe9\n", t.readline())
1685 self.assertRaises(TypeError, t.__init__, b, newline=42)
1686 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1687
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001688 def test_detach(self):
1689 r = self.BytesIO()
1690 b = self.BufferedWriter(r)
1691 t = self.TextIOWrapper(b)
1692 self.assertIs(t.detach(), b)
1693
1694 t = self.TextIOWrapper(b, encoding="ascii")
1695 t.write("howdy")
1696 self.assertFalse(r.getvalue())
1697 t.detach()
1698 self.assertEqual(r.getvalue(), b"howdy")
1699 self.assertRaises(ValueError, t.detach)
1700
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001701 def test_repr(self):
1702 raw = self.BytesIO("hello".encode("utf-8"))
1703 b = self.BufferedReader(raw)
1704 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001705 modname = self.TextIOWrapper.__module__
1706 self.assertEqual(repr(t),
1707 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1708 raw.name = "dummy"
1709 self.assertEqual(repr(t),
1710 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1711 raw.name = b"dummy"
1712 self.assertEqual(repr(t),
1713 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001714
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001715 def test_line_buffering(self):
1716 r = self.BytesIO()
1717 b = self.BufferedWriter(r, 1000)
1718 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001719 t.write("X")
1720 self.assertEquals(r.getvalue(), b"") # No flush happened
1721 t.write("Y\nZ")
1722 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1723 t.write("A\rB")
1724 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1725
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001726 def test_encoding(self):
1727 # Check the encoding attribute is always set, and valid
1728 b = self.BytesIO()
1729 t = self.TextIOWrapper(b, encoding="utf8")
1730 self.assertEqual(t.encoding, "utf8")
1731 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001732 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001733 codecs.lookup(t.encoding)
1734
1735 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001736 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001737 b = self.BytesIO(b"abc\n\xff\n")
1738 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001739 self.assertRaises(UnicodeError, t.read)
1740 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001741 b = self.BytesIO(b"abc\n\xff\n")
1742 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001743 self.assertRaises(UnicodeError, t.read)
1744 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001745 b = self.BytesIO(b"abc\n\xff\n")
1746 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001747 self.assertEquals(t.read(), "abc\n\n")
1748 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001749 b = self.BytesIO(b"abc\n\xff\n")
1750 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001751 self.assertEquals(t.read(), "abc\n\ufffd\n")
1752
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001753 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001754 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001755 b = self.BytesIO()
1756 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001757 self.assertRaises(UnicodeError, t.write, "\xff")
1758 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001759 b = self.BytesIO()
1760 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001761 self.assertRaises(UnicodeError, t.write, "\xff")
1762 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001763 b = self.BytesIO()
1764 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001765 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001766 t.write("abc\xffdef\n")
1767 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001768 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001769 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001770 b = self.BytesIO()
1771 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001772 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001773 t.write("abc\xffdef\n")
1774 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001775 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001776
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001777 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001778 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1779
1780 tests = [
1781 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001782 [ '', input_lines ],
1783 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1784 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1785 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001786 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001787 encodings = (
1788 'utf-8', 'latin-1',
1789 'utf-16', 'utf-16-le', 'utf-16-be',
1790 'utf-32', 'utf-32-le', 'utf-32-be',
1791 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001792
Guido van Rossum8358db22007-08-18 21:39:55 +00001793 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001794 # character in TextIOWrapper._pending_line.
1795 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001796 # XXX: str.encode() should return bytes
1797 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001798 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001799 for bufsize in range(1, 10):
1800 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001801 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1802 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001803 encoding=encoding)
1804 if do_reads:
1805 got_lines = []
1806 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001807 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001808 if c2 == '':
1809 break
1810 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001811 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001812 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001813 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001814
1815 for got_line, exp_line in zip(got_lines, exp_lines):
1816 self.assertEquals(got_line, exp_line)
1817 self.assertEquals(len(got_lines), len(exp_lines))
1818
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001819 def test_newlines_input(self):
1820 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001821 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1822 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001823 (None, normalized.decode("ascii").splitlines(True)),
1824 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001825 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1826 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1827 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001828 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001829 buf = self.BytesIO(testdata)
1830 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001831 self.assertEquals(txt.readlines(), expected)
1832 txt.seek(0)
1833 self.assertEquals(txt.read(), "".join(expected))
1834
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001835 def test_newlines_output(self):
1836 testdict = {
1837 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1838 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1839 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1840 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1841 }
1842 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1843 for newline, expected in tests:
1844 buf = self.BytesIO()
1845 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1846 txt.write("AAA\nB")
1847 txt.write("BB\nCCC\n")
1848 txt.write("X\rY\r\nZ")
1849 txt.flush()
1850 self.assertEquals(buf.closed, False)
1851 self.assertEquals(buf.getvalue(), expected)
1852
1853 def test_destructor(self):
1854 l = []
1855 base = self.BytesIO
1856 class MyBytesIO(base):
1857 def close(self):
1858 l.append(self.getvalue())
1859 base.close(self)
1860 b = MyBytesIO()
1861 t = self.TextIOWrapper(b, encoding="ascii")
1862 t.write("abc")
1863 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001864 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001865 self.assertEquals([b"abc"], l)
1866
1867 def test_override_destructor(self):
1868 record = []
1869 class MyTextIO(self.TextIOWrapper):
1870 def __del__(self):
1871 record.append(1)
1872 try:
1873 f = super().__del__
1874 except AttributeError:
1875 pass
1876 else:
1877 f()
1878 def close(self):
1879 record.append(2)
1880 super().close()
1881 def flush(self):
1882 record.append(3)
1883 super().flush()
1884 b = self.BytesIO()
1885 t = MyTextIO(b, encoding="ascii")
1886 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001887 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001888 self.assertEqual(record, [1, 2, 3])
1889
1890 def test_error_through_destructor(self):
1891 # Test that the exception state is not modified by a destructor,
1892 # even if close() fails.
1893 rawio = self.CloseFailureIO()
1894 def f():
1895 self.TextIOWrapper(rawio).xyzzy
1896 with support.captured_output("stderr") as s:
1897 self.assertRaises(AttributeError, f)
1898 s = s.getvalue().strip()
1899 if s:
1900 # The destructor *may* have printed an unraisable error, check it
1901 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001902 self.assertTrue(s.startswith("Exception IOError: "), s)
1903 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001904
Guido van Rossum9b76da62007-04-11 01:09:03 +00001905 # Systematic tests of the text I/O API
1906
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001907 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001908 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1909 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001910 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001911 f._CHUNK_SIZE = chunksize
1912 self.assertEquals(f.write("abc"), 3)
1913 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001914 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001915 f._CHUNK_SIZE = chunksize
1916 self.assertEquals(f.tell(), 0)
1917 self.assertEquals(f.read(), "abc")
1918 cookie = f.tell()
1919 self.assertEquals(f.seek(0), 0)
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001920 self.assertEquals(f.read(None), "abc")
1921 f.seek(0)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001922 self.assertEquals(f.read(2), "ab")
1923 self.assertEquals(f.read(1), "c")
1924 self.assertEquals(f.read(1), "")
1925 self.assertEquals(f.read(), "")
1926 self.assertEquals(f.tell(), cookie)
1927 self.assertEquals(f.seek(0), 0)
1928 self.assertEquals(f.seek(0, 2), cookie)
1929 self.assertEquals(f.write("def"), 3)
1930 self.assertEquals(f.seek(cookie), cookie)
1931 self.assertEquals(f.read(), "def")
1932 if enc.startswith("utf"):
1933 self.multi_line_test(f, enc)
1934 f.close()
1935
1936 def multi_line_test(self, f, enc):
1937 f.seek(0)
1938 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001939 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001940 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001941 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001942 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001943 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001944 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001945 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001946 wlines.append((f.tell(), line))
1947 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001948 f.seek(0)
1949 rlines = []
1950 while True:
1951 pos = f.tell()
1952 line = f.readline()
1953 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001954 break
1955 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001956 self.assertEquals(rlines, wlines)
1957
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001958 def test_telling(self):
1959 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001960 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001961 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001962 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001963 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001964 p2 = f.tell()
1965 f.seek(0)
1966 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001967 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001968 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001969 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001970 self.assertEquals(f.tell(), p2)
1971 f.seek(0)
1972 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001973 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001974 self.assertRaises(IOError, f.tell)
1975 self.assertEquals(f.tell(), p2)
1976 f.close()
1977
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001978 def test_seeking(self):
1979 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001980 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001981 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001982 prefix = bytes(u_prefix.encode("utf-8"))
1983 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001984 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001985 suffix = bytes(u_suffix.encode("utf-8"))
1986 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001987 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001988 f.write(line*2)
1989 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001990 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001991 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001992 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001993 self.assertEquals(f.tell(), prefix_size)
1994 self.assertEquals(f.readline(), u_suffix)
1995
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001996 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001997 # Regression test for a specific bug
1998 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001999 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00002000 f.write(data)
2001 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002002 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00002003 f._CHUNK_SIZE # Just test that it exists
2004 f._CHUNK_SIZE = 2
2005 f.readline()
2006 f.tell()
2007
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002008 def test_seek_and_tell(self):
2009 #Test seek/tell using the StatefulIncrementalDecoder.
2010 # Make test faster by doing smaller seeks
2011 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002012
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002013 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002014 """Tell/seek to various points within a data stream and ensure
2015 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002016 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002017 f.write(data)
2018 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002019 f = self.open(support.TESTFN, encoding='test_decoder')
2020 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002021 decoded = f.read()
2022 f.close()
2023
Neal Norwitze2b07052008-03-18 19:52:05 +00002024 for i in range(min_pos, len(decoded) + 1): # seek positions
2025 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002026 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002027 self.assertEquals(f.read(i), decoded[:i])
2028 cookie = f.tell()
2029 self.assertEquals(f.read(j), decoded[i:i + j])
2030 f.seek(cookie)
2031 self.assertEquals(f.read(), decoded[i:])
2032 f.close()
2033
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002034 # Enable the test decoder.
2035 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002036
2037 # Run the tests.
2038 try:
2039 # Try each test case.
2040 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002041 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002042
2043 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002044 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2045 offset = CHUNK_SIZE - len(input)//2
2046 prefix = b'.'*offset
2047 # Don't bother seeking into the prefix (takes too long).
2048 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002049 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002050
2051 # Ensure our test decoder won't interfere with subsequent tests.
2052 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002053 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002054
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002055 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002056 data = "1234567890"
2057 tests = ("utf-16",
2058 "utf-16-le",
2059 "utf-16-be",
2060 "utf-32",
2061 "utf-32-le",
2062 "utf-32-be")
2063 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002064 buf = self.BytesIO()
2065 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002066 # Check if the BOM is written only once (see issue1753).
2067 f.write(data)
2068 f.write(data)
2069 f.seek(0)
2070 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002071 f.seek(0)
2072 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002073 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
2074
Benjamin Petersona1b49012009-03-31 23:11:32 +00002075 def test_unreadable(self):
2076 class UnReadable(self.BytesIO):
2077 def readable(self):
2078 return False
2079 txt = self.TextIOWrapper(UnReadable())
2080 self.assertRaises(IOError, txt.read)
2081
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002082 def test_read_one_by_one(self):
2083 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002084 reads = ""
2085 while True:
2086 c = txt.read(1)
2087 if not c:
2088 break
2089 reads += c
2090 self.assertEquals(reads, "AA\nBB")
2091
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002092 def test_readlines(self):
2093 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2094 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2095 txt.seek(0)
2096 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2097 txt.seek(0)
2098 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2099
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002100 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002101 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002102 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002103 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002104 reads = ""
2105 while True:
2106 c = txt.read(128)
2107 if not c:
2108 break
2109 reads += c
2110 self.assertEquals(reads, "A"*127+"\nB")
2111
2112 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002113 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002114
2115 # read one char at a time
2116 reads = ""
2117 while True:
2118 c = txt.read(1)
2119 if not c:
2120 break
2121 reads += c
2122 self.assertEquals(reads, self.normalized)
2123
2124 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002125 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002126 txt._CHUNK_SIZE = 4
2127
2128 reads = ""
2129 while True:
2130 c = txt.read(4)
2131 if not c:
2132 break
2133 reads += c
2134 self.assertEquals(reads, self.normalized)
2135
2136 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002137 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002138 txt._CHUNK_SIZE = 4
2139
2140 reads = txt.read(4)
2141 reads += txt.read(4)
2142 reads += txt.readline()
2143 reads += txt.readline()
2144 reads += txt.readline()
2145 self.assertEquals(reads, self.normalized)
2146
2147 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002148 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002149 txt._CHUNK_SIZE = 4
2150
2151 reads = txt.read(4)
2152 reads += txt.read()
2153 self.assertEquals(reads, self.normalized)
2154
2155 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002156 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002157 txt._CHUNK_SIZE = 4
2158
2159 reads = txt.read(4)
2160 pos = txt.tell()
2161 txt.seek(0)
2162 txt.seek(pos)
2163 self.assertEquals(txt.read(4), "BBB\n")
2164
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002165 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002166 buffer = self.BytesIO(self.testdata)
2167 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002168
2169 self.assertEqual(buffer.seekable(), txt.seekable())
2170
Antoine Pitroue4501852009-05-14 18:55:55 +00002171 def test_append_bom(self):
2172 # The BOM is not written again when appending to a non-empty file
2173 filename = support.TESTFN
2174 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2175 with self.open(filename, 'w', encoding=charset) as f:
2176 f.write('aaa')
2177 pos = f.tell()
2178 with self.open(filename, 'rb') as f:
2179 self.assertEquals(f.read(), 'aaa'.encode(charset))
2180
2181 with self.open(filename, 'a', encoding=charset) as f:
2182 f.write('xxx')
2183 with self.open(filename, 'rb') as f:
2184 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2185
2186 def test_seek_bom(self):
2187 # Same test, but when seeking manually
2188 filename = support.TESTFN
2189 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2190 with self.open(filename, 'w', encoding=charset) as f:
2191 f.write('aaa')
2192 pos = f.tell()
2193 with self.open(filename, 'r+', encoding=charset) as f:
2194 f.seek(pos)
2195 f.write('zzz')
2196 f.seek(0)
2197 f.write('bbb')
2198 with self.open(filename, 'rb') as f:
2199 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2200
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002201 def test_errors_property(self):
2202 with self.open(support.TESTFN, "w") as f:
2203 self.assertEqual(f.errors, "strict")
2204 with self.open(support.TESTFN, "w", errors="replace") as f:
2205 self.assertEqual(f.errors, "replace")
2206
Victor Stinner45df8202010-04-28 22:31:17 +00002207 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002208 def test_threads_write(self):
2209 # Issue6750: concurrent writes could duplicate data
2210 event = threading.Event()
2211 with self.open(support.TESTFN, "w", buffering=1) as f:
2212 def run(n):
2213 text = "Thread%03d\n" % n
2214 event.wait()
2215 f.write(text)
2216 threads = [threading.Thread(target=lambda n=x: run(n))
2217 for x in range(20)]
2218 for t in threads:
2219 t.start()
2220 time.sleep(0.02)
2221 event.set()
2222 for t in threads:
2223 t.join()
2224 with self.open(support.TESTFN) as f:
2225 content = f.read()
2226 for n in range(20):
2227 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2228
Antoine Pitrou6be88762010-05-03 16:48:20 +00002229 def test_flush_error_on_close(self):
2230 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2231 def bad_flush():
2232 raise IOError()
2233 txt.flush = bad_flush
2234 self.assertRaises(IOError, txt.close) # exception not swallowed
2235
2236 def test_multi_close(self):
2237 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2238 txt.close()
2239 txt.close()
2240 txt.close()
2241 self.assertRaises(ValueError, txt.flush)
2242
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002243 def test_unseekable(self):
2244 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2245 self.assertRaises(self.UnsupportedOperation, txt.tell)
2246 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2247
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002248class CTextIOWrapperTest(TextIOWrapperTest):
2249
2250 def test_initialization(self):
2251 r = self.BytesIO(b"\xc3\xa9\n\n")
2252 b = self.BufferedReader(r, 1000)
2253 t = self.TextIOWrapper(b)
2254 self.assertRaises(TypeError, t.__init__, b, newline=42)
2255 self.assertRaises(ValueError, t.read)
2256 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2257 self.assertRaises(ValueError, t.read)
2258
2259 def test_garbage_collection(self):
2260 # C TextIOWrapper objects are collected, and collecting them flushes
2261 # all data to disk.
2262 # The Python version has __del__, so it ends in gc.garbage instead.
2263 rawio = io.FileIO(support.TESTFN, "wb")
2264 b = self.BufferedWriter(rawio)
2265 t = self.TextIOWrapper(b, encoding="ascii")
2266 t.write("456def")
2267 t.x = t
2268 wr = weakref.ref(t)
2269 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002270 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002271 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002272 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002273 self.assertEqual(f.read(), b"456def")
2274
2275class PyTextIOWrapperTest(TextIOWrapperTest):
2276 pass
2277
2278
2279class IncrementalNewlineDecoderTest(unittest.TestCase):
2280
2281 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002282 # UTF-8 specific tests for a newline decoder
2283 def _check_decode(b, s, **kwargs):
2284 # We exercise getstate() / setstate() as well as decode()
2285 state = decoder.getstate()
2286 self.assertEquals(decoder.decode(b, **kwargs), s)
2287 decoder.setstate(state)
2288 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002289
Antoine Pitrou180a3362008-12-14 16:36:46 +00002290 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002291
Antoine Pitrou180a3362008-12-14 16:36:46 +00002292 _check_decode(b'\xe8', "")
2293 _check_decode(b'\xa2', "")
2294 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002295
Antoine Pitrou180a3362008-12-14 16:36:46 +00002296 _check_decode(b'\xe8', "")
2297 _check_decode(b'\xa2', "")
2298 _check_decode(b'\x88', "\u8888")
2299
2300 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002301 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2302
Antoine Pitrou180a3362008-12-14 16:36:46 +00002303 decoder.reset()
2304 _check_decode(b'\n', "\n")
2305 _check_decode(b'\r', "")
2306 _check_decode(b'', "\n", final=True)
2307 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002308
Antoine Pitrou180a3362008-12-14 16:36:46 +00002309 _check_decode(b'\r', "")
2310 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002311
Antoine Pitrou180a3362008-12-14 16:36:46 +00002312 _check_decode(b'\r\r\n', "\n\n")
2313 _check_decode(b'\r', "")
2314 _check_decode(b'\r', "\n")
2315 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002316
Antoine Pitrou180a3362008-12-14 16:36:46 +00002317 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2318 _check_decode(b'\xe8\xa2\x88', "\u8888")
2319 _check_decode(b'\n', "\n")
2320 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2321 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002322
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002323 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002324 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002325 if encoding is not None:
2326 encoder = codecs.getincrementalencoder(encoding)()
2327 def _decode_bytewise(s):
2328 # Decode one byte at a time
2329 for b in encoder.encode(s):
2330 result.append(decoder.decode(bytes([b])))
2331 else:
2332 encoder = None
2333 def _decode_bytewise(s):
2334 # Decode one char at a time
2335 for c in s:
2336 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002337 self.assertEquals(decoder.newlines, None)
2338 _decode_bytewise("abc\n\r")
2339 self.assertEquals(decoder.newlines, '\n')
2340 _decode_bytewise("\nabc")
2341 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2342 _decode_bytewise("abc\r")
2343 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2344 _decode_bytewise("abc")
2345 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2346 _decode_bytewise("abc\r")
2347 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2348 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002349 input = "abc"
2350 if encoder is not None:
2351 encoder.reset()
2352 input = encoder.encode(input)
2353 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002354 self.assertEquals(decoder.newlines, None)
2355
2356 def test_newline_decoder(self):
2357 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002358 # None meaning the IncrementalNewlineDecoder takes unicode input
2359 # rather than bytes input
2360 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002361 'utf-16', 'utf-16-le', 'utf-16-be',
2362 'utf-32', 'utf-32-le', 'utf-32-be',
2363 )
2364 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002365 decoder = enc and codecs.getincrementaldecoder(enc)()
2366 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2367 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002368 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002369 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2370 self.check_newline_decoding_utf8(decoder)
2371
Antoine Pitrou66913e22009-03-06 23:40:56 +00002372 def test_newline_bytes(self):
2373 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2374 def _check(dec):
2375 self.assertEquals(dec.newlines, None)
2376 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2377 self.assertEquals(dec.newlines, None)
2378 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2379 self.assertEquals(dec.newlines, None)
2380 dec = self.IncrementalNewlineDecoder(None, translate=False)
2381 _check(dec)
2382 dec = self.IncrementalNewlineDecoder(None, translate=True)
2383 _check(dec)
2384
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002385class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2386 pass
2387
2388class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2389 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002390
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002391
Guido van Rossum01a27522007-03-07 01:00:12 +00002392# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002393
Guido van Rossum5abbf752007-08-27 17:39:33 +00002394class MiscIOTest(unittest.TestCase):
2395
Barry Warsaw40e82462008-11-20 20:14:50 +00002396 def tearDown(self):
2397 support.unlink(support.TESTFN)
2398
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002399 def test___all__(self):
2400 for name in self.io.__all__:
2401 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002402 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002403 if name == "open":
2404 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002405 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002406 self.assertTrue(issubclass(obj, Exception), name)
2407 elif not name.startswith("SEEK_"):
2408 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002409
Barry Warsaw40e82462008-11-20 20:14:50 +00002410 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002411 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002412 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002413 f.close()
2414
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002415 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002416 self.assertEquals(f.name, support.TESTFN)
2417 self.assertEquals(f.buffer.name, support.TESTFN)
2418 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2419 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002420 self.assertEquals(f.buffer.mode, "rb")
2421 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002422 f.close()
2423
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002424 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002425 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002426 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2427 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002428
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002429 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002430 self.assertEquals(g.mode, "wb")
2431 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002432 self.assertEquals(g.name, f.fileno())
2433 self.assertEquals(g.raw.name, f.fileno())
2434 f.close()
2435 g.close()
2436
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002437 def test_io_after_close(self):
2438 for kwargs in [
2439 {"mode": "w"},
2440 {"mode": "wb"},
2441 {"mode": "w", "buffering": 1},
2442 {"mode": "w", "buffering": 2},
2443 {"mode": "wb", "buffering": 0},
2444 {"mode": "r"},
2445 {"mode": "rb"},
2446 {"mode": "r", "buffering": 1},
2447 {"mode": "r", "buffering": 2},
2448 {"mode": "rb", "buffering": 0},
2449 {"mode": "w+"},
2450 {"mode": "w+b"},
2451 {"mode": "w+", "buffering": 1},
2452 {"mode": "w+", "buffering": 2},
2453 {"mode": "w+b", "buffering": 0},
2454 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002455 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002456 f.close()
2457 self.assertRaises(ValueError, f.flush)
2458 self.assertRaises(ValueError, f.fileno)
2459 self.assertRaises(ValueError, f.isatty)
2460 self.assertRaises(ValueError, f.__iter__)
2461 if hasattr(f, "peek"):
2462 self.assertRaises(ValueError, f.peek, 1)
2463 self.assertRaises(ValueError, f.read)
2464 if hasattr(f, "read1"):
2465 self.assertRaises(ValueError, f.read1, 1024)
2466 if hasattr(f, "readinto"):
2467 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2468 self.assertRaises(ValueError, f.readline)
2469 self.assertRaises(ValueError, f.readlines)
2470 self.assertRaises(ValueError, f.seek, 0)
2471 self.assertRaises(ValueError, f.tell)
2472 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002473 self.assertRaises(ValueError, f.write,
2474 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002475 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002476 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002477
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002478 def test_blockingioerror(self):
2479 # Various BlockingIOError issues
2480 self.assertRaises(TypeError, self.BlockingIOError)
2481 self.assertRaises(TypeError, self.BlockingIOError, 1)
2482 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2483 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2484 b = self.BlockingIOError(1, "")
2485 self.assertEqual(b.characters_written, 0)
2486 class C(str):
2487 pass
2488 c = C("")
2489 b = self.BlockingIOError(1, c)
2490 c.b = b
2491 b.c = c
2492 wr = weakref.ref(c)
2493 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002494 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002495 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002496
2497 def test_abcs(self):
2498 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002499 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2500 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2501 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2502 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002503
2504 def _check_abc_inheritance(self, abcmodule):
2505 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002506 self.assertIsInstance(f, abcmodule.IOBase)
2507 self.assertIsInstance(f, abcmodule.RawIOBase)
2508 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2509 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002510 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002511 self.assertIsInstance(f, abcmodule.IOBase)
2512 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2513 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2514 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002515 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002516 self.assertIsInstance(f, abcmodule.IOBase)
2517 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2518 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2519 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002520
2521 def test_abc_inheritance(self):
2522 # Test implementations inherit from their respective ABCs
2523 self._check_abc_inheritance(self)
2524
2525 def test_abc_inheritance_official(self):
2526 # Test implementations inherit from the official ABCs of the
2527 # baseline "io" module.
2528 self._check_abc_inheritance(io)
2529
Antoine Pitroue033e062010-10-29 10:38:18 +00002530 def _check_warn_on_dealloc(self, *args, **kwargs):
2531 f = open(*args, **kwargs)
2532 r = repr(f)
2533 with self.assertWarns(ResourceWarning) as cm:
2534 f = None
2535 support.gc_collect()
2536 self.assertIn(r, str(cm.warning.args[0]))
2537
2538 def test_warn_on_dealloc(self):
2539 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2540 self._check_warn_on_dealloc(support.TESTFN, "wb")
2541 self._check_warn_on_dealloc(support.TESTFN, "w")
2542
2543 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2544 fds = []
2545 try:
2546 r, w = os.pipe()
2547 fds += r, w
2548 self._check_warn_on_dealloc(r, *args, **kwargs)
2549 # When using closefd=False, there's no warning
2550 r, w = os.pipe()
2551 fds += r, w
2552 with warnings.catch_warnings(record=True) as recorded:
2553 open(r, *args, closefd=False, **kwargs)
2554 support.gc_collect()
2555 self.assertEqual(recorded, [])
2556 finally:
2557 for fd in fds:
2558 try:
2559 os.close(fd)
2560 except EnvironmentError as e:
2561 if e.errno != errno.EBADF:
2562 raise
2563
2564 def test_warn_on_dealloc_fd(self):
2565 self._check_warn_on_dealloc_fd("rb", buffering=0)
2566 self._check_warn_on_dealloc_fd("rb")
2567 self._check_warn_on_dealloc_fd("r")
2568
2569
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002570class CMiscIOTest(MiscIOTest):
2571 io = io
2572
2573class PyMiscIOTest(MiscIOTest):
2574 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002575
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002576
2577@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2578class SignalsTest(unittest.TestCase):
2579
2580 def setUp(self):
2581 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2582
2583 def tearDown(self):
2584 signal.signal(signal.SIGALRM, self.oldalrm)
2585
2586 def alarm_interrupt(self, sig, frame):
2587 1/0
2588
2589 @unittest.skipUnless(threading, 'Threading required for this test.')
2590 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2591 """Check that a partial write, when it gets interrupted, properly
2592 invokes the signal handler."""
2593 read_results = []
2594 def _read():
2595 s = os.read(r, 1)
2596 read_results.append(s)
2597 t = threading.Thread(target=_read)
2598 t.daemon = True
2599 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002600 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002601 try:
2602 wio = self.io.open(w, **fdopen_kwargs)
2603 t.start()
2604 signal.alarm(1)
2605 # Fill the pipe enough that the write will be blocking.
2606 # It will be interrupted by the timer armed above. Since the
2607 # other thread has read one byte, the low-level write will
2608 # return with a successful (partial) result rather than an EINTR.
2609 # The buffered IO layer must check for pending signal
2610 # handlers, which in this case will invoke alarm_interrupt().
2611 self.assertRaises(ZeroDivisionError,
2612 wio.write, item * (1024 * 1024))
2613 t.join()
2614 # We got one byte, get another one and check that it isn't a
2615 # repeat of the first one.
2616 read_results.append(os.read(r, 1))
2617 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2618 finally:
2619 os.close(w)
2620 os.close(r)
2621 # This is deliberate. If we didn't close the file descriptor
2622 # before closing wio, wio would try to flush its internal
2623 # buffer, and block again.
2624 try:
2625 wio.close()
2626 except IOError as e:
2627 if e.errno != errno.EBADF:
2628 raise
2629
2630 def test_interrupted_write_unbuffered(self):
2631 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2632
2633 def test_interrupted_write_buffered(self):
2634 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2635
2636 def test_interrupted_write_text(self):
2637 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2638
2639class CSignalsTest(SignalsTest):
2640 io = io
2641
2642class PySignalsTest(SignalsTest):
2643 io = pyio
2644
2645
Guido van Rossum28524c72007-02-27 05:47:44 +00002646def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002647 tests = (CIOTest, PyIOTest,
2648 CBufferedReaderTest, PyBufferedReaderTest,
2649 CBufferedWriterTest, PyBufferedWriterTest,
2650 CBufferedRWPairTest, PyBufferedRWPairTest,
2651 CBufferedRandomTest, PyBufferedRandomTest,
2652 StatefulIncrementalDecoderTest,
2653 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2654 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002655 CMiscIOTest, PyMiscIOTest,
2656 CSignalsTest, PySignalsTest,
2657 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002658
2659 # Put the namespaces of the IO module we are testing and some useful mock
2660 # classes in the __dict__ of each test.
2661 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00002662 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002663 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2664 c_io_ns = {name : getattr(io, name) for name in all_members}
2665 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2666 globs = globals()
2667 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2668 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2669 # Avoid turning open into a bound method.
2670 py_io_ns["open"] = pyio.OpenWrapper
2671 for test in tests:
2672 if test.__name__.startswith("C"):
2673 for name, obj in c_io_ns.items():
2674 setattr(test, name, obj)
2675 elif test.__name__.startswith("Py"):
2676 for name, obj in py_io_ns.items():
2677 setattr(test, name, obj)
2678
2679 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002680
2681if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002682 test_main()