blob: 8784e340017ddfc4aea641732d0762b38825887f [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Georg Brandl1b37e872010-03-14 10:45:50 +000032from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000033from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000034from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000035
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000036import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000037import io # C implementation of io
38import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000039try:
40 import threading
41except ImportError:
42 threading = None
Guido van Rossum28524c72007-02-27 05:47:44 +000043
Guido van Rossuma9e20242007-03-08 00:43:48 +000044
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000045def _default_chunk_size():
46 """Get the default TextIOWrapper chunk size"""
47 with open(__file__, "r", encoding="latin1") as f:
48 return f._CHUNK_SIZE
49
50
Antoine Pitrou328ec742010-09-14 18:37:24 +000051class MockRawIOWithoutRead:
52 """A RawIO implementation without read(), so as to exercise the default
53 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000054
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000055 def __init__(self, read_stack=()):
56 self._read_stack = list(read_stack)
57 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000058 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000059 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000060
Guido van Rossum01a27522007-03-07 01:00:12 +000061 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000062 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000063 return len(b)
64
65 def writable(self):
66 return True
67
Guido van Rossum68bbcd22007-02-27 17:19:33 +000068 def fileno(self):
69 return 42
70
71 def readable(self):
72 return True
73
Guido van Rossum01a27522007-03-07 01:00:12 +000074 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075 return True
76
Guido van Rossum01a27522007-03-07 01:00:12 +000077 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000078 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000079
80 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 return 0 # same comment as above
82
83 def readinto(self, buf):
84 self._reads += 1
85 max_len = len(buf)
86 try:
87 data = self._read_stack[0]
88 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000089 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000090 return 0
91 if data is None:
92 del self._read_stack[0]
93 return None
94 n = len(data)
95 if len(data) <= max_len:
96 del self._read_stack[0]
97 buf[:n] = data
98 return n
99 else:
100 buf[:] = data[:max_len]
101 self._read_stack[0] = data[max_len:]
102 return max_len
103
104 def truncate(self, pos=None):
105 return pos
106
Antoine Pitrou328ec742010-09-14 18:37:24 +0000107class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
108 pass
109
110class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
111 pass
112
113
114class MockRawIO(MockRawIOWithoutRead):
115
116 def read(self, n=None):
117 self._reads += 1
118 try:
119 return self._read_stack.pop(0)
120 except:
121 self._extraneous_reads += 1
122 return b""
123
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000124class CMockRawIO(MockRawIO, io.RawIOBase):
125 pass
126
127class PyMockRawIO(MockRawIO, pyio.RawIOBase):
128 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000129
Guido van Rossuma9e20242007-03-08 00:43:48 +0000130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000131class MisbehavedRawIO(MockRawIO):
132 def write(self, b):
133 return super().write(b) * 2
134
135 def read(self, n=None):
136 return super().read(n) * 2
137
138 def seek(self, pos, whence):
139 return -123
140
141 def tell(self):
142 return -456
143
144 def readinto(self, buf):
145 super().readinto(buf)
146 return len(buf) * 5
147
148class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
149 pass
150
151class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
152 pass
153
154
155class CloseFailureIO(MockRawIO):
156 closed = 0
157
158 def close(self):
159 if not self.closed:
160 self.closed = 1
161 raise IOError
162
163class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
164 pass
165
166class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
167 pass
168
169
170class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000171
172 def __init__(self, data):
173 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000174 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000175
176 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000177 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000178 self.read_history.append(None if res is None else len(res))
179 return res
180
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 def readinto(self, b):
182 res = super().readinto(b)
183 self.read_history.append(res)
184 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000185
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000186class CMockFileIO(MockFileIO, io.BytesIO):
187 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000189class PyMockFileIO(MockFileIO, pyio.BytesIO):
190 pass
191
192
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000193class MockUnseekableIO:
194 def seekable(self):
195 return False
196
197 def seek(self, *args):
198 raise self.UnsupportedOperation("not seekable")
199
200 def tell(self, *args):
201 raise self.UnsupportedOperation("not seekable")
202
203class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
204 UnsupportedOperation = io.UnsupportedOperation
205
206class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
207 UnsupportedOperation = pyio.UnsupportedOperation
208
209
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000210class MockNonBlockWriterIO:
211
212 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000213 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000214 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000215
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216 def pop_written(self):
217 s = b"".join(self._write_stack)
218 self._write_stack[:] = []
219 return s
220
221 def block_on(self, char):
222 """Block when a given char is encountered."""
223 self._blocker_char = char
224
225 def readable(self):
226 return True
227
228 def seekable(self):
229 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000230
Guido van Rossum01a27522007-03-07 01:00:12 +0000231 def writable(self):
232 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000233
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000234 def write(self, b):
235 b = bytes(b)
236 n = -1
237 if self._blocker_char:
238 try:
239 n = b.index(self._blocker_char)
240 except ValueError:
241 pass
242 else:
243 self._blocker_char = None
244 self._write_stack.append(b[:n])
245 raise self.BlockingIOError(0, "test blocking", n)
246 self._write_stack.append(b)
247 return len(b)
248
249class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
250 BlockingIOError = io.BlockingIOError
251
252class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
253 BlockingIOError = pyio.BlockingIOError
254
Guido van Rossuma9e20242007-03-08 00:43:48 +0000255
Guido van Rossum28524c72007-02-27 05:47:44 +0000256class IOTest(unittest.TestCase):
257
Neal Norwitze7789b12008-03-24 06:18:09 +0000258 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000259 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000260
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000261 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000262 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000263
Guido van Rossum28524c72007-02-27 05:47:44 +0000264 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000265 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000266 f.truncate(0)
267 self.assertEqual(f.tell(), 5)
268 f.seek(0)
269
270 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000271 self.assertEqual(f.seek(0), 0)
272 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000273 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000274 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000276 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000277 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000278 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000279 self.assertEqual(f.seek(-1, 2), 13)
280 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000281
Guido van Rossum87429772007-04-10 21:06:59 +0000282 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000283 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000284 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285
Guido van Rossum9b76da62007-04-11 01:09:03 +0000286 def read_ops(self, f, buffered=False):
287 data = f.read(5)
288 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000289 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000290 self.assertEqual(f.readinto(data), 5)
291 self.assertEqual(data, b" worl")
292 self.assertEqual(f.readinto(data), 2)
293 self.assertEqual(len(data), 5)
294 self.assertEqual(data[:2], b"d\n")
295 self.assertEqual(f.seek(0), 0)
296 self.assertEqual(f.read(20), b"hello world\n")
297 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000298 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000299 self.assertEqual(f.seek(-6, 2), 6)
300 self.assertEqual(f.read(5), b"world")
301 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000302 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000303 self.assertEqual(f.seek(-6, 1), 5)
304 self.assertEqual(f.read(5), b" worl")
305 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000306 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000307 if buffered:
308 f.seek(0)
309 self.assertEqual(f.read(), b"hello world\n")
310 f.seek(6)
311 self.assertEqual(f.read(), b"world\n")
312 self.assertEqual(f.read(), b"")
313
Guido van Rossum34d69e52007-04-10 20:08:41 +0000314 LARGE = 2**31
315
Guido van Rossum53807da2007-04-10 19:01:47 +0000316 def large_file_ops(self, f):
317 assert f.readable()
318 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000319 self.assertEqual(f.seek(self.LARGE), self.LARGE)
320 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000321 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000322 self.assertEqual(f.tell(), self.LARGE + 3)
323 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000324 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000325 self.assertEqual(f.tell(), self.LARGE + 2)
326 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000327 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000328 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000329 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
330 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000331 self.assertEqual(f.read(2), b"x")
332
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000333 def test_invalid_operations(self):
334 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000335 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000336 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000337 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000338 self.assertRaises(exc, fp.read)
339 self.assertRaises(exc, fp.readline)
340 with self.open(support.TESTFN, "wb", buffering=0) as fp:
341 self.assertRaises(exc, fp.read)
342 self.assertRaises(exc, fp.readline)
343 with self.open(support.TESTFN, "rb", buffering=0) as fp:
344 self.assertRaises(exc, fp.write, b"blah")
345 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000346 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000347 self.assertRaises(exc, fp.write, b"blah")
348 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000349 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000350 self.assertRaises(exc, fp.write, "blah")
351 self.assertRaises(exc, fp.writelines, ["blah\n"])
352 # Non-zero seeking from current or end pos
353 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
354 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000355
Guido van Rossum28524c72007-02-27 05:47:44 +0000356 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000357 with self.open(support.TESTFN, "wb", buffering=0) as f:
358 self.assertEqual(f.readable(), False)
359 self.assertEqual(f.writable(), True)
360 self.assertEqual(f.seekable(), True)
361 self.write_ops(f)
362 with self.open(support.TESTFN, "rb", buffering=0) as f:
363 self.assertEqual(f.readable(), True)
364 self.assertEqual(f.writable(), False)
365 self.assertEqual(f.seekable(), True)
366 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000367
Guido van Rossum87429772007-04-10 21:06:59 +0000368 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000369 with self.open(support.TESTFN, "wb") as f:
370 self.assertEqual(f.readable(), False)
371 self.assertEqual(f.writable(), True)
372 self.assertEqual(f.seekable(), True)
373 self.write_ops(f)
374 with self.open(support.TESTFN, "rb") as f:
375 self.assertEqual(f.readable(), True)
376 self.assertEqual(f.writable(), False)
377 self.assertEqual(f.seekable(), True)
378 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000379
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000380 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000381 with self.open(support.TESTFN, "wb") as f:
382 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
383 with self.open(support.TESTFN, "rb") as f:
384 self.assertEqual(f.readline(), b"abc\n")
385 self.assertEqual(f.readline(10), b"def\n")
386 self.assertEqual(f.readline(2), b"xy")
387 self.assertEqual(f.readline(4), b"zzy\n")
388 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000389 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000390 self.assertRaises(TypeError, f.readline, 5.3)
391 with self.open(support.TESTFN, "r") as f:
392 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000393
Guido van Rossum28524c72007-02-27 05:47:44 +0000394 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000395 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000396 self.write_ops(f)
397 data = f.getvalue()
398 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000399 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000400 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000401
Guido van Rossum53807da2007-04-10 19:01:47 +0000402 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000403 # On Windows and Mac OSX this test comsumes large resources; It takes
404 # a long time to build the >2GB file and takes >2GB of disk space
405 # therefore the resource must be enabled to run this test.
406 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000407 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000408 print("\nTesting large file ops skipped on %s." % sys.platform,
409 file=sys.stderr)
410 print("It requires %d bytes and a long time." % self.LARGE,
411 file=sys.stderr)
412 print("Use 'regrtest.py -u largefile test_io' to run it.",
413 file=sys.stderr)
414 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000415 with self.open(support.TESTFN, "w+b", 0) as f:
416 self.large_file_ops(f)
417 with self.open(support.TESTFN, "w+b") as f:
418 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000419
420 def test_with_open(self):
421 for bufsize in (0, 1, 100):
422 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000423 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000424 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000425 self.assertEqual(f.closed, True)
426 f = None
427 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000428 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000429 1/0
430 except ZeroDivisionError:
431 self.assertEqual(f.closed, True)
432 else:
433 self.fail("1/0 didn't raise an exception")
434
Antoine Pitrou08838b62009-01-21 00:55:13 +0000435 # issue 5008
436 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000437 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000438 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000439 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000440 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000441 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000442 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000443 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000444 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000445
Guido van Rossum87429772007-04-10 21:06:59 +0000446 def test_destructor(self):
447 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000448 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000449 def __del__(self):
450 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 try:
452 f = super().__del__
453 except AttributeError:
454 pass
455 else:
456 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000457 def close(self):
458 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000459 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000460 def flush(self):
461 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000462 super().flush()
463 f = MyFileIO(support.TESTFN, "wb")
464 f.write(b"xxx")
465 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000466 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000467 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000468 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000469 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000470
471 def _check_base_destructor(self, base):
472 record = []
473 class MyIO(base):
474 def __init__(self):
475 # This exercises the availability of attributes on object
476 # destruction.
477 # (in the C version, close() is called by the tp_dealloc
478 # function, not by __del__)
479 self.on_del = 1
480 self.on_close = 2
481 self.on_flush = 3
482 def __del__(self):
483 record.append(self.on_del)
484 try:
485 f = super().__del__
486 except AttributeError:
487 pass
488 else:
489 f()
490 def close(self):
491 record.append(self.on_close)
492 super().close()
493 def flush(self):
494 record.append(self.on_flush)
495 super().flush()
496 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000497 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000498 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000499 self.assertEqual(record, [1, 2, 3])
500
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000501 def test_IOBase_destructor(self):
502 self._check_base_destructor(self.IOBase)
503
504 def test_RawIOBase_destructor(self):
505 self._check_base_destructor(self.RawIOBase)
506
507 def test_BufferedIOBase_destructor(self):
508 self._check_base_destructor(self.BufferedIOBase)
509
510 def test_TextIOBase_destructor(self):
511 self._check_base_destructor(self.TextIOBase)
512
Guido van Rossum87429772007-04-10 21:06:59 +0000513 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000514 with self.open(support.TESTFN, "wb") as f:
515 f.write(b"xxx")
516 with self.open(support.TESTFN, "rb") as f:
517 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000518
Guido van Rossumd4103952007-04-12 05:44:49 +0000519 def test_array_writes(self):
520 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000521 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000522 with self.open(support.TESTFN, "wb", 0) as f:
523 self.assertEqual(f.write(a), n)
524 with self.open(support.TESTFN, "wb") as f:
525 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000526
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000527 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000528 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000529 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000530
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000531 def test_read_closed(self):
532 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000533 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000534 with self.open(support.TESTFN, "r") as f:
535 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000536 self.assertEqual(file.read(), "egg\n")
537 file.seek(0)
538 file.close()
539 self.assertRaises(ValueError, file.read)
540
541 def test_no_closefd_with_filename(self):
542 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000543 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000544
545 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000546 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000547 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000548 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000549 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000550 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000551 self.assertEqual(file.buffer.raw.closefd, False)
552
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000553 def test_garbage_collection(self):
554 # FileIO objects are collected, and collecting them flushes
555 # all data to disk.
556 f = self.FileIO(support.TESTFN, "wb")
557 f.write(b"abcxxx")
558 f.f = f
559 wr = weakref.ref(f)
560 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000561 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000562 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000563 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000564 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000565
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000566 def test_unbounded_file(self):
567 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
568 zero = "/dev/zero"
569 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000570 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000571 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000572 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000573 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000574 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000575 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000576 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000577 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000578 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000579 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 self.assertRaises(OverflowError, f.read)
581
Antoine Pitrou6be88762010-05-03 16:48:20 +0000582 def test_flush_error_on_close(self):
583 f = self.open(support.TESTFN, "wb", buffering=0)
584 def bad_flush():
585 raise IOError()
586 f.flush = bad_flush
587 self.assertRaises(IOError, f.close) # exception not swallowed
588
589 def test_multi_close(self):
590 f = self.open(support.TESTFN, "wb", buffering=0)
591 f.close()
592 f.close()
593 f.close()
594 self.assertRaises(ValueError, f.flush)
595
Antoine Pitrou328ec742010-09-14 18:37:24 +0000596 def test_RawIOBase_read(self):
597 # Exercise the default RawIOBase.read() implementation (which calls
598 # readinto() internally).
599 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
600 self.assertEqual(rawio.read(2), b"ab")
601 self.assertEqual(rawio.read(2), b"c")
602 self.assertEqual(rawio.read(2), b"d")
603 self.assertEqual(rawio.read(2), None)
604 self.assertEqual(rawio.read(2), b"ef")
605 self.assertEqual(rawio.read(2), b"g")
606 self.assertEqual(rawio.read(2), None)
607 self.assertEqual(rawio.read(2), b"")
608
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000609class CIOTest(IOTest):
610 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000611
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000612class PyIOTest(IOTest):
613 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000614
Guido van Rossuma9e20242007-03-08 00:43:48 +0000615
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000616class CommonBufferedTests:
617 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
618
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000619 def test_detach(self):
620 raw = self.MockRawIO()
621 buf = self.tp(raw)
622 self.assertIs(buf.detach(), raw)
623 self.assertRaises(ValueError, buf.detach)
624
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000625 def test_fileno(self):
626 rawio = self.MockRawIO()
627 bufio = self.tp(rawio)
628
629 self.assertEquals(42, bufio.fileno())
630
631 def test_no_fileno(self):
632 # XXX will we always have fileno() function? If so, kill
633 # this test. Else, write it.
634 pass
635
636 def test_invalid_args(self):
637 rawio = self.MockRawIO()
638 bufio = self.tp(rawio)
639 # Invalid whence
640 self.assertRaises(ValueError, bufio.seek, 0, -1)
641 self.assertRaises(ValueError, bufio.seek, 0, 3)
642
643 def test_override_destructor(self):
644 tp = self.tp
645 record = []
646 class MyBufferedIO(tp):
647 def __del__(self):
648 record.append(1)
649 try:
650 f = super().__del__
651 except AttributeError:
652 pass
653 else:
654 f()
655 def close(self):
656 record.append(2)
657 super().close()
658 def flush(self):
659 record.append(3)
660 super().flush()
661 rawio = self.MockRawIO()
662 bufio = MyBufferedIO(rawio)
663 writable = bufio.writable()
664 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000665 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000666 if writable:
667 self.assertEqual(record, [1, 2, 3])
668 else:
669 self.assertEqual(record, [1, 2])
670
671 def test_context_manager(self):
672 # Test usability as a context manager
673 rawio = self.MockRawIO()
674 bufio = self.tp(rawio)
675 def _with():
676 with bufio:
677 pass
678 _with()
679 # bufio should now be closed, and using it a second time should raise
680 # a ValueError.
681 self.assertRaises(ValueError, _with)
682
683 def test_error_through_destructor(self):
684 # Test that the exception state is not modified by a destructor,
685 # even if close() fails.
686 rawio = self.CloseFailureIO()
687 def f():
688 self.tp(rawio).xyzzy
689 with support.captured_output("stderr") as s:
690 self.assertRaises(AttributeError, f)
691 s = s.getvalue().strip()
692 if s:
693 # The destructor *may* have printed an unraisable error, check it
694 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000695 self.assertTrue(s.startswith("Exception IOError: "), s)
696 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000697
Antoine Pitrou716c4442009-05-23 19:04:03 +0000698 def test_repr(self):
699 raw = self.MockRawIO()
700 b = self.tp(raw)
701 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
702 self.assertEqual(repr(b), "<%s>" % clsname)
703 raw.name = "dummy"
704 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
705 raw.name = b"dummy"
706 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
707
Antoine Pitrou6be88762010-05-03 16:48:20 +0000708 def test_flush_error_on_close(self):
709 raw = self.MockRawIO()
710 def bad_flush():
711 raise IOError()
712 raw.flush = bad_flush
713 b = self.tp(raw)
714 self.assertRaises(IOError, b.close) # exception not swallowed
715
716 def test_multi_close(self):
717 raw = self.MockRawIO()
718 b = self.tp(raw)
719 b.close()
720 b.close()
721 b.close()
722 self.assertRaises(ValueError, b.flush)
723
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000724 def test_unseekable(self):
725 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
726 self.assertRaises(self.UnsupportedOperation, bufio.tell)
727 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
728
Guido van Rossum78892e42007-04-06 17:31:18 +0000729
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000730class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
731 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000732
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000733 def test_constructor(self):
734 rawio = self.MockRawIO([b"abc"])
735 bufio = self.tp(rawio)
736 bufio.__init__(rawio)
737 bufio.__init__(rawio, buffer_size=1024)
738 bufio.__init__(rawio, buffer_size=16)
739 self.assertEquals(b"abc", bufio.read())
740 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
741 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
742 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
743 rawio = self.MockRawIO([b"abc"])
744 bufio.__init__(rawio)
745 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000746
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000747 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000748 for arg in (None, 7):
749 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
750 bufio = self.tp(rawio)
751 self.assertEquals(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000752 # Invalid args
753 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000754
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000755 def test_read1(self):
756 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
757 bufio = self.tp(rawio)
758 self.assertEquals(b"a", bufio.read(1))
759 self.assertEquals(b"b", bufio.read1(1))
760 self.assertEquals(rawio._reads, 1)
761 self.assertEquals(b"c", bufio.read1(100))
762 self.assertEquals(rawio._reads, 1)
763 self.assertEquals(b"d", bufio.read1(100))
764 self.assertEquals(rawio._reads, 2)
765 self.assertEquals(b"efg", bufio.read1(100))
766 self.assertEquals(rawio._reads, 3)
767 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000768 self.assertEquals(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000769 # Invalid args
770 self.assertRaises(ValueError, bufio.read1, -1)
771
772 def test_readinto(self):
773 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
774 bufio = self.tp(rawio)
775 b = bytearray(2)
776 self.assertEquals(bufio.readinto(b), 2)
777 self.assertEquals(b, b"ab")
778 self.assertEquals(bufio.readinto(b), 2)
779 self.assertEquals(b, b"cd")
780 self.assertEquals(bufio.readinto(b), 2)
781 self.assertEquals(b, b"ef")
782 self.assertEquals(bufio.readinto(b), 1)
783 self.assertEquals(b, b"gf")
784 self.assertEquals(bufio.readinto(b), 0)
785 self.assertEquals(b, b"gf")
786
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000787 def test_readlines(self):
788 def bufio():
789 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
790 return self.tp(rawio)
791 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
792 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
793 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
794
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000795 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000796 data = b"abcdefghi"
797 dlen = len(data)
798
799 tests = [
800 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
801 [ 100, [ 3, 3, 3], [ dlen ] ],
802 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
803 ]
804
805 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000806 rawio = self.MockFileIO(data)
807 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000808 pos = 0
809 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000810 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000811 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000812 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000813 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000814
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000815 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000816 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000817 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
818 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000819
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000820 self.assertEquals(b"abcd", bufio.read(6))
821 self.assertEquals(b"e", bufio.read(1))
822 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000823 self.assertEquals(b"", bufio.peek(1))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000824 self.assertTrue(None is bufio.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000825 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000826
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000827 def test_read_past_eof(self):
828 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
829 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000830
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000831 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000832
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000833 def test_read_all(self):
834 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
835 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000836
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000837 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000838
Victor Stinner45df8202010-04-28 22:31:17 +0000839 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000840 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000841 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000842 try:
843 # Write out many bytes with exactly the same number of 0's,
844 # 1's... 255's. This will help us check that concurrent reading
845 # doesn't duplicate or forget contents.
846 N = 1000
847 l = list(range(256)) * N
848 random.shuffle(l)
849 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000850 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000851 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000852 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000853 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000854 errors = []
855 results = []
856 def f():
857 try:
858 # Intra-buffer read then buffer-flushing read
859 for n in cycle([1, 19]):
860 s = bufio.read(n)
861 if not s:
862 break
863 # list.append() is atomic
864 results.append(s)
865 except Exception as e:
866 errors.append(e)
867 raise
868 threads = [threading.Thread(target=f) for x in range(20)]
869 for t in threads:
870 t.start()
871 time.sleep(0.02) # yield
872 for t in threads:
873 t.join()
874 self.assertFalse(errors,
875 "the following exceptions were caught: %r" % errors)
876 s = b''.join(results)
877 for i in range(256):
878 c = bytes(bytearray([i]))
879 self.assertEqual(s.count(c), N)
880 finally:
881 support.unlink(support.TESTFN)
882
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000883 def test_misbehaved_io(self):
884 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
885 bufio = self.tp(rawio)
886 self.assertRaises(IOError, bufio.seek, 0)
887 self.assertRaises(IOError, bufio.tell)
888
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000889 def test_no_extraneous_read(self):
890 # Issue #9550; when the raw IO object has satisfied the read request,
891 # we should not issue any additional reads, otherwise it may block
892 # (e.g. socket).
893 bufsize = 16
894 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
895 rawio = self.MockRawIO([b"x" * n])
896 bufio = self.tp(rawio, bufsize)
897 self.assertEqual(bufio.read(n), b"x" * n)
898 # Simple case: one raw read is enough to satisfy the request.
899 self.assertEqual(rawio._extraneous_reads, 0,
900 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
901 # A more complex case where two raw reads are needed to satisfy
902 # the request.
903 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
904 bufio = self.tp(rawio, bufsize)
905 self.assertEqual(bufio.read(n), b"x" * n)
906 self.assertEqual(rawio._extraneous_reads, 0,
907 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
908
909
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000910class CBufferedReaderTest(BufferedReaderTest):
911 tp = io.BufferedReader
912
913 def test_constructor(self):
914 BufferedReaderTest.test_constructor(self)
915 # The allocation can succeed on 32-bit builds, e.g. with more
916 # than 2GB RAM and a 64-bit kernel.
917 if sys.maxsize > 0x7FFFFFFF:
918 rawio = self.MockRawIO()
919 bufio = self.tp(rawio)
920 self.assertRaises((OverflowError, MemoryError, ValueError),
921 bufio.__init__, rawio, sys.maxsize)
922
923 def test_initialization(self):
924 rawio = self.MockRawIO([b"abc"])
925 bufio = self.tp(rawio)
926 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
927 self.assertRaises(ValueError, bufio.read)
928 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
929 self.assertRaises(ValueError, bufio.read)
930 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
931 self.assertRaises(ValueError, bufio.read)
932
933 def test_misbehaved_io_read(self):
934 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
935 bufio = self.tp(rawio)
936 # _pyio.BufferedReader seems to implement reading different, so that
937 # checking this is not so easy.
938 self.assertRaises(IOError, bufio.read, 10)
939
940 def test_garbage_collection(self):
941 # C BufferedReader objects are collected.
942 # The Python version has __del__, so it ends into gc.garbage instead
943 rawio = self.FileIO(support.TESTFN, "w+b")
944 f = self.tp(rawio)
945 f.f = f
946 wr = weakref.ref(f)
947 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000948 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000949 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000950
951class PyBufferedReaderTest(BufferedReaderTest):
952 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000953
Guido van Rossuma9e20242007-03-08 00:43:48 +0000954
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000955class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
956 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000957
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000958 def test_constructor(self):
959 rawio = self.MockRawIO()
960 bufio = self.tp(rawio)
961 bufio.__init__(rawio)
962 bufio.__init__(rawio, buffer_size=1024)
963 bufio.__init__(rawio, buffer_size=16)
964 self.assertEquals(3, bufio.write(b"abc"))
965 bufio.flush()
966 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
967 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
968 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
969 bufio.__init__(rawio)
970 self.assertEquals(3, bufio.write(b"ghi"))
971 bufio.flush()
972 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
973
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000974 def test_detach_flush(self):
975 raw = self.MockRawIO()
976 buf = self.tp(raw)
977 buf.write(b"howdy!")
978 self.assertFalse(raw._write_stack)
979 buf.detach()
980 self.assertEqual(raw._write_stack, [b"howdy!"])
981
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000982 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000983 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000984 writer = self.MockRawIO()
985 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000986 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000987 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000988
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000989 def test_write_overflow(self):
990 writer = self.MockRawIO()
991 bufio = self.tp(writer, 8)
992 contents = b"abcdefghijklmnop"
993 for n in range(0, len(contents), 3):
994 bufio.write(contents[n:n+3])
995 flushed = b"".join(writer._write_stack)
996 # At least (total - 8) bytes were implicitly flushed, perhaps more
997 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000998 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000999
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001000 def check_writes(self, intermediate_func):
1001 # Lots of writes, test the flushed output is as expected.
1002 contents = bytes(range(256)) * 1000
1003 n = 0
1004 writer = self.MockRawIO()
1005 bufio = self.tp(writer, 13)
1006 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1007 def gen_sizes():
1008 for size in count(1):
1009 for i in range(15):
1010 yield size
1011 sizes = gen_sizes()
1012 while n < len(contents):
1013 size = min(next(sizes), len(contents) - n)
1014 self.assertEquals(bufio.write(contents[n:n+size]), size)
1015 intermediate_func(bufio)
1016 n += size
1017 bufio.flush()
1018 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001019
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001020 def test_writes(self):
1021 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001022
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001023 def test_writes_and_flushes(self):
1024 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001025
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001026 def test_writes_and_seeks(self):
1027 def _seekabs(bufio):
1028 pos = bufio.tell()
1029 bufio.seek(pos + 1, 0)
1030 bufio.seek(pos - 1, 0)
1031 bufio.seek(pos, 0)
1032 self.check_writes(_seekabs)
1033 def _seekrel(bufio):
1034 pos = bufio.seek(0, 1)
1035 bufio.seek(+1, 1)
1036 bufio.seek(-1, 1)
1037 bufio.seek(pos, 0)
1038 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001039
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001040 def test_writes_and_truncates(self):
1041 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001042
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001043 def test_write_non_blocking(self):
1044 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001045 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001047 self.assertEquals(bufio.write(b"abcd"), 4)
1048 self.assertEquals(bufio.write(b"efghi"), 5)
1049 # 1 byte will be written, the rest will be buffered
1050 raw.block_on(b"k")
1051 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001052
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001053 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1054 raw.block_on(b"0")
1055 try:
1056 bufio.write(b"opqrwxyz0123456789")
1057 except self.BlockingIOError as e:
1058 written = e.characters_written
1059 else:
1060 self.fail("BlockingIOError should have been raised")
1061 self.assertEquals(written, 16)
1062 self.assertEquals(raw.pop_written(),
1063 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001064
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001065 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
1066 s = raw.pop_written()
1067 # Previously buffered bytes were flushed
1068 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001069
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001070 def test_write_and_rewind(self):
1071 raw = io.BytesIO()
1072 bufio = self.tp(raw, 4)
1073 self.assertEqual(bufio.write(b"abcdef"), 6)
1074 self.assertEqual(bufio.tell(), 6)
1075 bufio.seek(0, 0)
1076 self.assertEqual(bufio.write(b"XY"), 2)
1077 bufio.seek(6, 0)
1078 self.assertEqual(raw.getvalue(), b"XYcdef")
1079 self.assertEqual(bufio.write(b"123456"), 6)
1080 bufio.flush()
1081 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001082
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001083 def test_flush(self):
1084 writer = self.MockRawIO()
1085 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001086 bufio.write(b"abc")
1087 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001088 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001089
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001090 def test_destructor(self):
1091 writer = self.MockRawIO()
1092 bufio = self.tp(writer, 8)
1093 bufio.write(b"abc")
1094 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001095 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001096 self.assertEquals(b"abc", writer._write_stack[0])
1097
1098 def test_truncate(self):
1099 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001100 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001101 bufio = self.tp(raw, 8)
1102 bufio.write(b"abcdef")
1103 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001104 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001105 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001106 self.assertEqual(f.read(), b"abc")
1107
Victor Stinner45df8202010-04-28 22:31:17 +00001108 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001109 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001110 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001111 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001112 # Write out many bytes from many threads and test they were
1113 # all flushed.
1114 N = 1000
1115 contents = bytes(range(256)) * N
1116 sizes = cycle([1, 19])
1117 n = 0
1118 queue = deque()
1119 while n < len(contents):
1120 size = next(sizes)
1121 queue.append(contents[n:n+size])
1122 n += size
1123 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001124 # We use a real file object because it allows us to
1125 # exercise situations where the GIL is released before
1126 # writing the buffer to the raw streams. This is in addition
1127 # to concurrency issues due to switching threads in the middle
1128 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001129 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001130 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001131 errors = []
1132 def f():
1133 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001134 while True:
1135 try:
1136 s = queue.popleft()
1137 except IndexError:
1138 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001139 bufio.write(s)
1140 except Exception as e:
1141 errors.append(e)
1142 raise
1143 threads = [threading.Thread(target=f) for x in range(20)]
1144 for t in threads:
1145 t.start()
1146 time.sleep(0.02) # yield
1147 for t in threads:
1148 t.join()
1149 self.assertFalse(errors,
1150 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001151 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001152 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001153 s = f.read()
1154 for i in range(256):
1155 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001156 finally:
1157 support.unlink(support.TESTFN)
1158
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001159 def test_misbehaved_io(self):
1160 rawio = self.MisbehavedRawIO()
1161 bufio = self.tp(rawio, 5)
1162 self.assertRaises(IOError, bufio.seek, 0)
1163 self.assertRaises(IOError, bufio.tell)
1164 self.assertRaises(IOError, bufio.write, b"abcdef")
1165
Benjamin Peterson59406a92009-03-26 17:10:29 +00001166 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001167 with support.check_warnings(("max_buffer_size is deprecated",
1168 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001169 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001170
1171
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001172class CBufferedWriterTest(BufferedWriterTest):
1173 tp = io.BufferedWriter
1174
1175 def test_constructor(self):
1176 BufferedWriterTest.test_constructor(self)
1177 # The allocation can succeed on 32-bit builds, e.g. with more
1178 # than 2GB RAM and a 64-bit kernel.
1179 if sys.maxsize > 0x7FFFFFFF:
1180 rawio = self.MockRawIO()
1181 bufio = self.tp(rawio)
1182 self.assertRaises((OverflowError, MemoryError, ValueError),
1183 bufio.__init__, rawio, sys.maxsize)
1184
1185 def test_initialization(self):
1186 rawio = self.MockRawIO()
1187 bufio = self.tp(rawio)
1188 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1189 self.assertRaises(ValueError, bufio.write, b"def")
1190 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1191 self.assertRaises(ValueError, bufio.write, b"def")
1192 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1193 self.assertRaises(ValueError, bufio.write, b"def")
1194
1195 def test_garbage_collection(self):
1196 # C BufferedWriter objects are collected, and collecting them flushes
1197 # all data to disk.
1198 # The Python version has __del__, so it ends into gc.garbage instead
1199 rawio = self.FileIO(support.TESTFN, "w+b")
1200 f = self.tp(rawio)
1201 f.write(b"123xxx")
1202 f.x = f
1203 wr = weakref.ref(f)
1204 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001205 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001206 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001207 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001208 self.assertEqual(f.read(), b"123xxx")
1209
1210
1211class PyBufferedWriterTest(BufferedWriterTest):
1212 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001213
Guido van Rossum01a27522007-03-07 01:00:12 +00001214class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001215
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001216 def test_constructor(self):
1217 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001218 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001219
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001220 def test_detach(self):
1221 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1222 self.assertRaises(self.UnsupportedOperation, pair.detach)
1223
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001224 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001225 with support.check_warnings(("max_buffer_size is deprecated",
1226 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001227 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001228
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001229 def test_constructor_with_not_readable(self):
1230 class NotReadable(MockRawIO):
1231 def readable(self):
1232 return False
1233
1234 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1235
1236 def test_constructor_with_not_writeable(self):
1237 class NotWriteable(MockRawIO):
1238 def writable(self):
1239 return False
1240
1241 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1242
1243 def test_read(self):
1244 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1245
1246 self.assertEqual(pair.read(3), b"abc")
1247 self.assertEqual(pair.read(1), b"d")
1248 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001249 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1250 self.assertEqual(pair.read(None), b"abc")
1251
1252 def test_readlines(self):
1253 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1254 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1255 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1256 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001257
1258 def test_read1(self):
1259 # .read1() is delegated to the underlying reader object, so this test
1260 # can be shallow.
1261 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1262
1263 self.assertEqual(pair.read1(3), b"abc")
1264
1265 def test_readinto(self):
1266 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1267
1268 data = bytearray(5)
1269 self.assertEqual(pair.readinto(data), 5)
1270 self.assertEqual(data, b"abcde")
1271
1272 def test_write(self):
1273 w = self.MockRawIO()
1274 pair = self.tp(self.MockRawIO(), w)
1275
1276 pair.write(b"abc")
1277 pair.flush()
1278 pair.write(b"def")
1279 pair.flush()
1280 self.assertEqual(w._write_stack, [b"abc", b"def"])
1281
1282 def test_peek(self):
1283 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1284
1285 self.assertTrue(pair.peek(3).startswith(b"abc"))
1286 self.assertEqual(pair.read(3), b"abc")
1287
1288 def test_readable(self):
1289 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1290 self.assertTrue(pair.readable())
1291
1292 def test_writeable(self):
1293 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1294 self.assertTrue(pair.writable())
1295
1296 def test_seekable(self):
1297 # BufferedRWPairs are never seekable, even if their readers and writers
1298 # are.
1299 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1300 self.assertFalse(pair.seekable())
1301
1302 # .flush() is delegated to the underlying writer object and has been
1303 # tested in the test_write method.
1304
1305 def test_close_and_closed(self):
1306 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1307 self.assertFalse(pair.closed)
1308 pair.close()
1309 self.assertTrue(pair.closed)
1310
1311 def test_isatty(self):
1312 class SelectableIsAtty(MockRawIO):
1313 def __init__(self, isatty):
1314 MockRawIO.__init__(self)
1315 self._isatty = isatty
1316
1317 def isatty(self):
1318 return self._isatty
1319
1320 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1321 self.assertFalse(pair.isatty())
1322
1323 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1324 self.assertTrue(pair.isatty())
1325
1326 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1327 self.assertTrue(pair.isatty())
1328
1329 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1330 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001331
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001332class CBufferedRWPairTest(BufferedRWPairTest):
1333 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001334
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001335class PyBufferedRWPairTest(BufferedRWPairTest):
1336 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001337
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001338
1339class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1340 read_mode = "rb+"
1341 write_mode = "wb+"
1342
1343 def test_constructor(self):
1344 BufferedReaderTest.test_constructor(self)
1345 BufferedWriterTest.test_constructor(self)
1346
1347 def test_read_and_write(self):
1348 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001349 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001350
1351 self.assertEqual(b"as", rw.read(2))
1352 rw.write(b"ddd")
1353 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001354 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001355 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001356 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001357
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001358 def test_seek_and_tell(self):
1359 raw = self.BytesIO(b"asdfghjkl")
1360 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001361
1362 self.assertEquals(b"as", rw.read(2))
1363 self.assertEquals(2, rw.tell())
1364 rw.seek(0, 0)
1365 self.assertEquals(b"asdf", rw.read(4))
1366
1367 rw.write(b"asdf")
1368 rw.seek(0, 0)
1369 self.assertEquals(b"asdfasdfl", rw.read())
1370 self.assertEquals(9, rw.tell())
1371 rw.seek(-4, 2)
1372 self.assertEquals(5, rw.tell())
1373 rw.seek(2, 1)
1374 self.assertEquals(7, rw.tell())
1375 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001376 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001377
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001378 def check_flush_and_read(self, read_func):
1379 raw = self.BytesIO(b"abcdefghi")
1380 bufio = self.tp(raw)
1381
1382 self.assertEquals(b"ab", read_func(bufio, 2))
1383 bufio.write(b"12")
1384 self.assertEquals(b"ef", read_func(bufio, 2))
1385 self.assertEquals(6, bufio.tell())
1386 bufio.flush()
1387 self.assertEquals(6, bufio.tell())
1388 self.assertEquals(b"ghi", read_func(bufio))
1389 raw.seek(0, 0)
1390 raw.write(b"XYZ")
1391 # flush() resets the read buffer
1392 bufio.flush()
1393 bufio.seek(0, 0)
1394 self.assertEquals(b"XYZ", read_func(bufio, 3))
1395
1396 def test_flush_and_read(self):
1397 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1398
1399 def test_flush_and_readinto(self):
1400 def _readinto(bufio, n=-1):
1401 b = bytearray(n if n >= 0 else 9999)
1402 n = bufio.readinto(b)
1403 return bytes(b[:n])
1404 self.check_flush_and_read(_readinto)
1405
1406 def test_flush_and_peek(self):
1407 def _peek(bufio, n=-1):
1408 # This relies on the fact that the buffer can contain the whole
1409 # raw stream, otherwise peek() can return less.
1410 b = bufio.peek(n)
1411 if n != -1:
1412 b = b[:n]
1413 bufio.seek(len(b), 1)
1414 return b
1415 self.check_flush_and_read(_peek)
1416
1417 def test_flush_and_write(self):
1418 raw = self.BytesIO(b"abcdefghi")
1419 bufio = self.tp(raw)
1420
1421 bufio.write(b"123")
1422 bufio.flush()
1423 bufio.write(b"45")
1424 bufio.flush()
1425 bufio.seek(0, 0)
1426 self.assertEquals(b"12345fghi", raw.getvalue())
1427 self.assertEquals(b"12345fghi", bufio.read())
1428
1429 def test_threads(self):
1430 BufferedReaderTest.test_threads(self)
1431 BufferedWriterTest.test_threads(self)
1432
1433 def test_writes_and_peek(self):
1434 def _peek(bufio):
1435 bufio.peek(1)
1436 self.check_writes(_peek)
1437 def _peek(bufio):
1438 pos = bufio.tell()
1439 bufio.seek(-1, 1)
1440 bufio.peek(1)
1441 bufio.seek(pos, 0)
1442 self.check_writes(_peek)
1443
1444 def test_writes_and_reads(self):
1445 def _read(bufio):
1446 bufio.seek(-1, 1)
1447 bufio.read(1)
1448 self.check_writes(_read)
1449
1450 def test_writes_and_read1s(self):
1451 def _read1(bufio):
1452 bufio.seek(-1, 1)
1453 bufio.read1(1)
1454 self.check_writes(_read1)
1455
1456 def test_writes_and_readintos(self):
1457 def _read(bufio):
1458 bufio.seek(-1, 1)
1459 bufio.readinto(bytearray(1))
1460 self.check_writes(_read)
1461
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001462 def test_write_after_readahead(self):
1463 # Issue #6629: writing after the buffer was filled by readahead should
1464 # first rewind the raw stream.
1465 for overwrite_size in [1, 5]:
1466 raw = self.BytesIO(b"A" * 10)
1467 bufio = self.tp(raw, 4)
1468 # Trigger readahead
1469 self.assertEqual(bufio.read(1), b"A")
1470 self.assertEqual(bufio.tell(), 1)
1471 # Overwriting should rewind the raw stream if it needs so
1472 bufio.write(b"B" * overwrite_size)
1473 self.assertEqual(bufio.tell(), overwrite_size + 1)
1474 # If the write size was smaller than the buffer size, flush() and
1475 # check that rewind happens.
1476 bufio.flush()
1477 self.assertEqual(bufio.tell(), overwrite_size + 1)
1478 s = raw.getvalue()
1479 self.assertEqual(s,
1480 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1481
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001482 def test_truncate_after_read_or_write(self):
1483 raw = self.BytesIO(b"A" * 10)
1484 bufio = self.tp(raw, 100)
1485 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1486 self.assertEqual(bufio.truncate(), 2)
1487 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1488 self.assertEqual(bufio.truncate(), 4)
1489
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001490 def test_misbehaved_io(self):
1491 BufferedReaderTest.test_misbehaved_io(self)
1492 BufferedWriterTest.test_misbehaved_io(self)
1493
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001494 # You can't construct a BufferedRandom over a non-seekable stream.
1495 test_unseekable = None
1496
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001497class CBufferedRandomTest(BufferedRandomTest):
1498 tp = io.BufferedRandom
1499
1500 def test_constructor(self):
1501 BufferedRandomTest.test_constructor(self)
1502 # The allocation can succeed on 32-bit builds, e.g. with more
1503 # than 2GB RAM and a 64-bit kernel.
1504 if sys.maxsize > 0x7FFFFFFF:
1505 rawio = self.MockRawIO()
1506 bufio = self.tp(rawio)
1507 self.assertRaises((OverflowError, MemoryError, ValueError),
1508 bufio.__init__, rawio, sys.maxsize)
1509
1510 def test_garbage_collection(self):
1511 CBufferedReaderTest.test_garbage_collection(self)
1512 CBufferedWriterTest.test_garbage_collection(self)
1513
1514class PyBufferedRandomTest(BufferedRandomTest):
1515 tp = pyio.BufferedRandom
1516
1517
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001518# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1519# properties:
1520# - A single output character can correspond to many bytes of input.
1521# - The number of input bytes to complete the character can be
1522# undetermined until the last input byte is received.
1523# - The number of input bytes can vary depending on previous input.
1524# - A single input byte can correspond to many characters of output.
1525# - The number of output characters can be undetermined until the
1526# last input byte is received.
1527# - The number of output characters can vary depending on previous input.
1528
1529class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1530 """
1531 For testing seek/tell behavior with a stateful, buffering decoder.
1532
1533 Input is a sequence of words. Words may be fixed-length (length set
1534 by input) or variable-length (period-terminated). In variable-length
1535 mode, extra periods are ignored. Possible words are:
1536 - 'i' followed by a number sets the input length, I (maximum 99).
1537 When I is set to 0, words are space-terminated.
1538 - 'o' followed by a number sets the output length, O (maximum 99).
1539 - Any other word is converted into a word followed by a period on
1540 the output. The output word consists of the input word truncated
1541 or padded out with hyphens to make its length equal to O. If O
1542 is 0, the word is output verbatim without truncating or padding.
1543 I and O are initially set to 1. When I changes, any buffered input is
1544 re-scanned according to the new I. EOF also terminates the last word.
1545 """
1546
1547 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001548 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001549 self.reset()
1550
1551 def __repr__(self):
1552 return '<SID %x>' % id(self)
1553
1554 def reset(self):
1555 self.i = 1
1556 self.o = 1
1557 self.buffer = bytearray()
1558
1559 def getstate(self):
1560 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1561 return bytes(self.buffer), i*100 + o
1562
1563 def setstate(self, state):
1564 buffer, io = state
1565 self.buffer = bytearray(buffer)
1566 i, o = divmod(io, 100)
1567 self.i, self.o = i ^ 1, o ^ 1
1568
1569 def decode(self, input, final=False):
1570 output = ''
1571 for b in input:
1572 if self.i == 0: # variable-length, terminated with period
1573 if b == ord('.'):
1574 if self.buffer:
1575 output += self.process_word()
1576 else:
1577 self.buffer.append(b)
1578 else: # fixed-length, terminate after self.i bytes
1579 self.buffer.append(b)
1580 if len(self.buffer) == self.i:
1581 output += self.process_word()
1582 if final and self.buffer: # EOF terminates the last word
1583 output += self.process_word()
1584 return output
1585
1586 def process_word(self):
1587 output = ''
1588 if self.buffer[0] == ord('i'):
1589 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1590 elif self.buffer[0] == ord('o'):
1591 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1592 else:
1593 output = self.buffer.decode('ascii')
1594 if len(output) < self.o:
1595 output += '-'*self.o # pad out with hyphens
1596 if self.o:
1597 output = output[:self.o] # truncate to output length
1598 output += '.'
1599 self.buffer = bytearray()
1600 return output
1601
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001602 codecEnabled = False
1603
1604 @classmethod
1605 def lookupTestDecoder(cls, name):
1606 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001607 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001608 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001609 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001610 incrementalencoder=None,
1611 streamreader=None, streamwriter=None,
1612 incrementaldecoder=cls)
1613
1614# Register the previous decoder for testing.
1615# Disabled by default, tests will enable it.
1616codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1617
1618
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001619class StatefulIncrementalDecoderTest(unittest.TestCase):
1620 """
1621 Make sure the StatefulIncrementalDecoder actually works.
1622 """
1623
1624 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001625 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001626 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001627 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001628 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001629 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001630 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001631 # I=0, O=6 (variable-length input, fixed-length output)
1632 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1633 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001634 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001635 # I=6, O=3 (fixed-length input > fixed-length output)
1636 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1637 # I=0, then 3; O=29, then 15 (with longer output)
1638 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1639 'a----------------------------.' +
1640 'b----------------------------.' +
1641 'cde--------------------------.' +
1642 'abcdefghijabcde.' +
1643 'a.b------------.' +
1644 '.c.------------.' +
1645 'd.e------------.' +
1646 'k--------------.' +
1647 'l--------------.' +
1648 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001649 ]
1650
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001651 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001652 # Try a few one-shot test cases.
1653 for input, eof, output in self.test_cases:
1654 d = StatefulIncrementalDecoder()
1655 self.assertEquals(d.decode(input, eof), output)
1656
1657 # Also test an unfinished decode, followed by forcing EOF.
1658 d = StatefulIncrementalDecoder()
1659 self.assertEquals(d.decode(b'oiabcd'), '')
1660 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001661
1662class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001663
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001664 def setUp(self):
1665 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1666 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001667 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001668
Guido van Rossumd0712812007-04-11 16:32:43 +00001669 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001670 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001671
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001672 def test_constructor(self):
1673 r = self.BytesIO(b"\xc3\xa9\n\n")
1674 b = self.BufferedReader(r, 1000)
1675 t = self.TextIOWrapper(b)
1676 t.__init__(b, encoding="latin1", newline="\r\n")
1677 self.assertEquals(t.encoding, "latin1")
1678 self.assertEquals(t.line_buffering, False)
1679 t.__init__(b, encoding="utf8", line_buffering=True)
1680 self.assertEquals(t.encoding, "utf8")
1681 self.assertEquals(t.line_buffering, True)
1682 self.assertEquals("\xe9\n", t.readline())
1683 self.assertRaises(TypeError, t.__init__, b, newline=42)
1684 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1685
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001686 def test_detach(self):
1687 r = self.BytesIO()
1688 b = self.BufferedWriter(r)
1689 t = self.TextIOWrapper(b)
1690 self.assertIs(t.detach(), b)
1691
1692 t = self.TextIOWrapper(b, encoding="ascii")
1693 t.write("howdy")
1694 self.assertFalse(r.getvalue())
1695 t.detach()
1696 self.assertEqual(r.getvalue(), b"howdy")
1697 self.assertRaises(ValueError, t.detach)
1698
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001699 def test_repr(self):
1700 raw = self.BytesIO("hello".encode("utf-8"))
1701 b = self.BufferedReader(raw)
1702 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001703 modname = self.TextIOWrapper.__module__
1704 self.assertEqual(repr(t),
1705 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1706 raw.name = "dummy"
1707 self.assertEqual(repr(t),
1708 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1709 raw.name = b"dummy"
1710 self.assertEqual(repr(t),
1711 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001712
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001713 def test_line_buffering(self):
1714 r = self.BytesIO()
1715 b = self.BufferedWriter(r, 1000)
1716 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001717 t.write("X")
1718 self.assertEquals(r.getvalue(), b"") # No flush happened
1719 t.write("Y\nZ")
1720 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1721 t.write("A\rB")
1722 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1723
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001724 def test_encoding(self):
1725 # Check the encoding attribute is always set, and valid
1726 b = self.BytesIO()
1727 t = self.TextIOWrapper(b, encoding="utf8")
1728 self.assertEqual(t.encoding, "utf8")
1729 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001730 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001731 codecs.lookup(t.encoding)
1732
1733 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001734 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001735 b = self.BytesIO(b"abc\n\xff\n")
1736 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001737 self.assertRaises(UnicodeError, t.read)
1738 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001739 b = self.BytesIO(b"abc\n\xff\n")
1740 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001741 self.assertRaises(UnicodeError, t.read)
1742 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001743 b = self.BytesIO(b"abc\n\xff\n")
1744 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001745 self.assertEquals(t.read(), "abc\n\n")
1746 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001747 b = self.BytesIO(b"abc\n\xff\n")
1748 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001749 self.assertEquals(t.read(), "abc\n\ufffd\n")
1750
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001751 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001752 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001753 b = self.BytesIO()
1754 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001755 self.assertRaises(UnicodeError, t.write, "\xff")
1756 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001757 b = self.BytesIO()
1758 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001759 self.assertRaises(UnicodeError, t.write, "\xff")
1760 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001761 b = self.BytesIO()
1762 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001763 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001764 t.write("abc\xffdef\n")
1765 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001766 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001767 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001768 b = self.BytesIO()
1769 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001770 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001771 t.write("abc\xffdef\n")
1772 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001773 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001774
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001775 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001776 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1777
1778 tests = [
1779 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001780 [ '', input_lines ],
1781 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1782 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1783 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001784 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001785 encodings = (
1786 'utf-8', 'latin-1',
1787 'utf-16', 'utf-16-le', 'utf-16-be',
1788 'utf-32', 'utf-32-le', 'utf-32-be',
1789 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001790
Guido van Rossum8358db22007-08-18 21:39:55 +00001791 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001792 # character in TextIOWrapper._pending_line.
1793 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001794 # XXX: str.encode() should return bytes
1795 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001796 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001797 for bufsize in range(1, 10):
1798 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001799 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1800 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001801 encoding=encoding)
1802 if do_reads:
1803 got_lines = []
1804 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001805 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001806 if c2 == '':
1807 break
1808 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001809 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001810 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001811 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001812
1813 for got_line, exp_line in zip(got_lines, exp_lines):
1814 self.assertEquals(got_line, exp_line)
1815 self.assertEquals(len(got_lines), len(exp_lines))
1816
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001817 def test_newlines_input(self):
1818 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001819 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1820 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001821 (None, normalized.decode("ascii").splitlines(True)),
1822 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001823 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1824 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1825 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001826 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001827 buf = self.BytesIO(testdata)
1828 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001829 self.assertEquals(txt.readlines(), expected)
1830 txt.seek(0)
1831 self.assertEquals(txt.read(), "".join(expected))
1832
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001833 def test_newlines_output(self):
1834 testdict = {
1835 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1836 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1837 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1838 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1839 }
1840 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1841 for newline, expected in tests:
1842 buf = self.BytesIO()
1843 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1844 txt.write("AAA\nB")
1845 txt.write("BB\nCCC\n")
1846 txt.write("X\rY\r\nZ")
1847 txt.flush()
1848 self.assertEquals(buf.closed, False)
1849 self.assertEquals(buf.getvalue(), expected)
1850
1851 def test_destructor(self):
1852 l = []
1853 base = self.BytesIO
1854 class MyBytesIO(base):
1855 def close(self):
1856 l.append(self.getvalue())
1857 base.close(self)
1858 b = MyBytesIO()
1859 t = self.TextIOWrapper(b, encoding="ascii")
1860 t.write("abc")
1861 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001862 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001863 self.assertEquals([b"abc"], l)
1864
1865 def test_override_destructor(self):
1866 record = []
1867 class MyTextIO(self.TextIOWrapper):
1868 def __del__(self):
1869 record.append(1)
1870 try:
1871 f = super().__del__
1872 except AttributeError:
1873 pass
1874 else:
1875 f()
1876 def close(self):
1877 record.append(2)
1878 super().close()
1879 def flush(self):
1880 record.append(3)
1881 super().flush()
1882 b = self.BytesIO()
1883 t = MyTextIO(b, encoding="ascii")
1884 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001885 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001886 self.assertEqual(record, [1, 2, 3])
1887
1888 def test_error_through_destructor(self):
1889 # Test that the exception state is not modified by a destructor,
1890 # even if close() fails.
1891 rawio = self.CloseFailureIO()
1892 def f():
1893 self.TextIOWrapper(rawio).xyzzy
1894 with support.captured_output("stderr") as s:
1895 self.assertRaises(AttributeError, f)
1896 s = s.getvalue().strip()
1897 if s:
1898 # The destructor *may* have printed an unraisable error, check it
1899 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001900 self.assertTrue(s.startswith("Exception IOError: "), s)
1901 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001902
Guido van Rossum9b76da62007-04-11 01:09:03 +00001903 # Systematic tests of the text I/O API
1904
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001905 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001906 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1907 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001908 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001909 f._CHUNK_SIZE = chunksize
1910 self.assertEquals(f.write("abc"), 3)
1911 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001912 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001913 f._CHUNK_SIZE = chunksize
1914 self.assertEquals(f.tell(), 0)
1915 self.assertEquals(f.read(), "abc")
1916 cookie = f.tell()
1917 self.assertEquals(f.seek(0), 0)
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001918 self.assertEquals(f.read(None), "abc")
1919 f.seek(0)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001920 self.assertEquals(f.read(2), "ab")
1921 self.assertEquals(f.read(1), "c")
1922 self.assertEquals(f.read(1), "")
1923 self.assertEquals(f.read(), "")
1924 self.assertEquals(f.tell(), cookie)
1925 self.assertEquals(f.seek(0), 0)
1926 self.assertEquals(f.seek(0, 2), cookie)
1927 self.assertEquals(f.write("def"), 3)
1928 self.assertEquals(f.seek(cookie), cookie)
1929 self.assertEquals(f.read(), "def")
1930 if enc.startswith("utf"):
1931 self.multi_line_test(f, enc)
1932 f.close()
1933
1934 def multi_line_test(self, f, enc):
1935 f.seek(0)
1936 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001937 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001938 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001939 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001940 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001941 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001942 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001943 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001944 wlines.append((f.tell(), line))
1945 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001946 f.seek(0)
1947 rlines = []
1948 while True:
1949 pos = f.tell()
1950 line = f.readline()
1951 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001952 break
1953 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001954 self.assertEquals(rlines, wlines)
1955
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001956 def test_telling(self):
1957 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001958 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001959 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001960 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001961 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001962 p2 = f.tell()
1963 f.seek(0)
1964 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001965 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001966 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001967 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001968 self.assertEquals(f.tell(), p2)
1969 f.seek(0)
1970 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001971 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001972 self.assertRaises(IOError, f.tell)
1973 self.assertEquals(f.tell(), p2)
1974 f.close()
1975
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001976 def test_seeking(self):
1977 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001978 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001979 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001980 prefix = bytes(u_prefix.encode("utf-8"))
1981 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001982 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001983 suffix = bytes(u_suffix.encode("utf-8"))
1984 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001985 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001986 f.write(line*2)
1987 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001988 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001989 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001990 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001991 self.assertEquals(f.tell(), prefix_size)
1992 self.assertEquals(f.readline(), u_suffix)
1993
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001994 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001995 # Regression test for a specific bug
1996 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001997 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001998 f.write(data)
1999 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002000 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00002001 f._CHUNK_SIZE # Just test that it exists
2002 f._CHUNK_SIZE = 2
2003 f.readline()
2004 f.tell()
2005
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002006 def test_seek_and_tell(self):
2007 #Test seek/tell using the StatefulIncrementalDecoder.
2008 # Make test faster by doing smaller seeks
2009 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002010
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002011 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002012 """Tell/seek to various points within a data stream and ensure
2013 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002014 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002015 f.write(data)
2016 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002017 f = self.open(support.TESTFN, encoding='test_decoder')
2018 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002019 decoded = f.read()
2020 f.close()
2021
Neal Norwitze2b07052008-03-18 19:52:05 +00002022 for i in range(min_pos, len(decoded) + 1): # seek positions
2023 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002024 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002025 self.assertEquals(f.read(i), decoded[:i])
2026 cookie = f.tell()
2027 self.assertEquals(f.read(j), decoded[i:i + j])
2028 f.seek(cookie)
2029 self.assertEquals(f.read(), decoded[i:])
2030 f.close()
2031
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002032 # Enable the test decoder.
2033 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002034
2035 # Run the tests.
2036 try:
2037 # Try each test case.
2038 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002039 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002040
2041 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002042 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2043 offset = CHUNK_SIZE - len(input)//2
2044 prefix = b'.'*offset
2045 # Don't bother seeking into the prefix (takes too long).
2046 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002047 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002048
2049 # Ensure our test decoder won't interfere with subsequent tests.
2050 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002051 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002052
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002053 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002054 data = "1234567890"
2055 tests = ("utf-16",
2056 "utf-16-le",
2057 "utf-16-be",
2058 "utf-32",
2059 "utf-32-le",
2060 "utf-32-be")
2061 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002062 buf = self.BytesIO()
2063 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002064 # Check if the BOM is written only once (see issue1753).
2065 f.write(data)
2066 f.write(data)
2067 f.seek(0)
2068 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002069 f.seek(0)
2070 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002071 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
2072
Benjamin Petersona1b49012009-03-31 23:11:32 +00002073 def test_unreadable(self):
2074 class UnReadable(self.BytesIO):
2075 def readable(self):
2076 return False
2077 txt = self.TextIOWrapper(UnReadable())
2078 self.assertRaises(IOError, txt.read)
2079
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002080 def test_read_one_by_one(self):
2081 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002082 reads = ""
2083 while True:
2084 c = txt.read(1)
2085 if not c:
2086 break
2087 reads += c
2088 self.assertEquals(reads, "AA\nBB")
2089
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002090 def test_readlines(self):
2091 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2092 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2093 txt.seek(0)
2094 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2095 txt.seek(0)
2096 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2097
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002098 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002099 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002100 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002101 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002102 reads = ""
2103 while True:
2104 c = txt.read(128)
2105 if not c:
2106 break
2107 reads += c
2108 self.assertEquals(reads, "A"*127+"\nB")
2109
2110 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002111 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002112
2113 # read one char at a time
2114 reads = ""
2115 while True:
2116 c = txt.read(1)
2117 if not c:
2118 break
2119 reads += c
2120 self.assertEquals(reads, self.normalized)
2121
2122 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002123 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002124 txt._CHUNK_SIZE = 4
2125
2126 reads = ""
2127 while True:
2128 c = txt.read(4)
2129 if not c:
2130 break
2131 reads += c
2132 self.assertEquals(reads, self.normalized)
2133
2134 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002135 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002136 txt._CHUNK_SIZE = 4
2137
2138 reads = txt.read(4)
2139 reads += txt.read(4)
2140 reads += txt.readline()
2141 reads += txt.readline()
2142 reads += txt.readline()
2143 self.assertEquals(reads, self.normalized)
2144
2145 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002146 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002147 txt._CHUNK_SIZE = 4
2148
2149 reads = txt.read(4)
2150 reads += txt.read()
2151 self.assertEquals(reads, self.normalized)
2152
2153 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002154 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002155 txt._CHUNK_SIZE = 4
2156
2157 reads = txt.read(4)
2158 pos = txt.tell()
2159 txt.seek(0)
2160 txt.seek(pos)
2161 self.assertEquals(txt.read(4), "BBB\n")
2162
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002163 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002164 buffer = self.BytesIO(self.testdata)
2165 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002166
2167 self.assertEqual(buffer.seekable(), txt.seekable())
2168
Antoine Pitroue4501852009-05-14 18:55:55 +00002169 def test_append_bom(self):
2170 # The BOM is not written again when appending to a non-empty file
2171 filename = support.TESTFN
2172 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2173 with self.open(filename, 'w', encoding=charset) as f:
2174 f.write('aaa')
2175 pos = f.tell()
2176 with self.open(filename, 'rb') as f:
2177 self.assertEquals(f.read(), 'aaa'.encode(charset))
2178
2179 with self.open(filename, 'a', encoding=charset) as f:
2180 f.write('xxx')
2181 with self.open(filename, 'rb') as f:
2182 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2183
2184 def test_seek_bom(self):
2185 # Same test, but when seeking manually
2186 filename = support.TESTFN
2187 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2188 with self.open(filename, 'w', encoding=charset) as f:
2189 f.write('aaa')
2190 pos = f.tell()
2191 with self.open(filename, 'r+', encoding=charset) as f:
2192 f.seek(pos)
2193 f.write('zzz')
2194 f.seek(0)
2195 f.write('bbb')
2196 with self.open(filename, 'rb') as f:
2197 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2198
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002199 def test_errors_property(self):
2200 with self.open(support.TESTFN, "w") as f:
2201 self.assertEqual(f.errors, "strict")
2202 with self.open(support.TESTFN, "w", errors="replace") as f:
2203 self.assertEqual(f.errors, "replace")
2204
Victor Stinner45df8202010-04-28 22:31:17 +00002205 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002206 def test_threads_write(self):
2207 # Issue6750: concurrent writes could duplicate data
2208 event = threading.Event()
2209 with self.open(support.TESTFN, "w", buffering=1) as f:
2210 def run(n):
2211 text = "Thread%03d\n" % n
2212 event.wait()
2213 f.write(text)
2214 threads = [threading.Thread(target=lambda n=x: run(n))
2215 for x in range(20)]
2216 for t in threads:
2217 t.start()
2218 time.sleep(0.02)
2219 event.set()
2220 for t in threads:
2221 t.join()
2222 with self.open(support.TESTFN) as f:
2223 content = f.read()
2224 for n in range(20):
2225 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2226
Antoine Pitrou6be88762010-05-03 16:48:20 +00002227 def test_flush_error_on_close(self):
2228 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2229 def bad_flush():
2230 raise IOError()
2231 txt.flush = bad_flush
2232 self.assertRaises(IOError, txt.close) # exception not swallowed
2233
2234 def test_multi_close(self):
2235 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2236 txt.close()
2237 txt.close()
2238 txt.close()
2239 self.assertRaises(ValueError, txt.flush)
2240
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002241 def test_unseekable(self):
2242 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2243 self.assertRaises(self.UnsupportedOperation, txt.tell)
2244 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2245
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002246class CTextIOWrapperTest(TextIOWrapperTest):
2247
2248 def test_initialization(self):
2249 r = self.BytesIO(b"\xc3\xa9\n\n")
2250 b = self.BufferedReader(r, 1000)
2251 t = self.TextIOWrapper(b)
2252 self.assertRaises(TypeError, t.__init__, b, newline=42)
2253 self.assertRaises(ValueError, t.read)
2254 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2255 self.assertRaises(ValueError, t.read)
2256
2257 def test_garbage_collection(self):
2258 # C TextIOWrapper objects are collected, and collecting them flushes
2259 # all data to disk.
2260 # The Python version has __del__, so it ends in gc.garbage instead.
2261 rawio = io.FileIO(support.TESTFN, "wb")
2262 b = self.BufferedWriter(rawio)
2263 t = self.TextIOWrapper(b, encoding="ascii")
2264 t.write("456def")
2265 t.x = t
2266 wr = weakref.ref(t)
2267 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002268 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002269 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002270 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002271 self.assertEqual(f.read(), b"456def")
2272
2273class PyTextIOWrapperTest(TextIOWrapperTest):
2274 pass
2275
2276
2277class IncrementalNewlineDecoderTest(unittest.TestCase):
2278
2279 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002280 # UTF-8 specific tests for a newline decoder
2281 def _check_decode(b, s, **kwargs):
2282 # We exercise getstate() / setstate() as well as decode()
2283 state = decoder.getstate()
2284 self.assertEquals(decoder.decode(b, **kwargs), s)
2285 decoder.setstate(state)
2286 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002287
Antoine Pitrou180a3362008-12-14 16:36:46 +00002288 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002289
Antoine Pitrou180a3362008-12-14 16:36:46 +00002290 _check_decode(b'\xe8', "")
2291 _check_decode(b'\xa2', "")
2292 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002293
Antoine Pitrou180a3362008-12-14 16:36:46 +00002294 _check_decode(b'\xe8', "")
2295 _check_decode(b'\xa2', "")
2296 _check_decode(b'\x88', "\u8888")
2297
2298 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002299 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2300
Antoine Pitrou180a3362008-12-14 16:36:46 +00002301 decoder.reset()
2302 _check_decode(b'\n', "\n")
2303 _check_decode(b'\r', "")
2304 _check_decode(b'', "\n", final=True)
2305 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002306
Antoine Pitrou180a3362008-12-14 16:36:46 +00002307 _check_decode(b'\r', "")
2308 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002309
Antoine Pitrou180a3362008-12-14 16:36:46 +00002310 _check_decode(b'\r\r\n', "\n\n")
2311 _check_decode(b'\r', "")
2312 _check_decode(b'\r', "\n")
2313 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002314
Antoine Pitrou180a3362008-12-14 16:36:46 +00002315 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2316 _check_decode(b'\xe8\xa2\x88', "\u8888")
2317 _check_decode(b'\n', "\n")
2318 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2319 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002320
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002321 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002322 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002323 if encoding is not None:
2324 encoder = codecs.getincrementalencoder(encoding)()
2325 def _decode_bytewise(s):
2326 # Decode one byte at a time
2327 for b in encoder.encode(s):
2328 result.append(decoder.decode(bytes([b])))
2329 else:
2330 encoder = None
2331 def _decode_bytewise(s):
2332 # Decode one char at a time
2333 for c in s:
2334 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002335 self.assertEquals(decoder.newlines, None)
2336 _decode_bytewise("abc\n\r")
2337 self.assertEquals(decoder.newlines, '\n')
2338 _decode_bytewise("\nabc")
2339 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2340 _decode_bytewise("abc\r")
2341 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2342 _decode_bytewise("abc")
2343 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2344 _decode_bytewise("abc\r")
2345 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2346 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002347 input = "abc"
2348 if encoder is not None:
2349 encoder.reset()
2350 input = encoder.encode(input)
2351 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002352 self.assertEquals(decoder.newlines, None)
2353
2354 def test_newline_decoder(self):
2355 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002356 # None meaning the IncrementalNewlineDecoder takes unicode input
2357 # rather than bytes input
2358 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002359 'utf-16', 'utf-16-le', 'utf-16-be',
2360 'utf-32', 'utf-32-le', 'utf-32-be',
2361 )
2362 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002363 decoder = enc and codecs.getincrementaldecoder(enc)()
2364 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2365 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002366 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002367 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2368 self.check_newline_decoding_utf8(decoder)
2369
Antoine Pitrou66913e22009-03-06 23:40:56 +00002370 def test_newline_bytes(self):
2371 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2372 def _check(dec):
2373 self.assertEquals(dec.newlines, None)
2374 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2375 self.assertEquals(dec.newlines, None)
2376 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2377 self.assertEquals(dec.newlines, None)
2378 dec = self.IncrementalNewlineDecoder(None, translate=False)
2379 _check(dec)
2380 dec = self.IncrementalNewlineDecoder(None, translate=True)
2381 _check(dec)
2382
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002383class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2384 pass
2385
2386class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2387 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002388
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002389
Guido van Rossum01a27522007-03-07 01:00:12 +00002390# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002391
Guido van Rossum5abbf752007-08-27 17:39:33 +00002392class MiscIOTest(unittest.TestCase):
2393
Barry Warsaw40e82462008-11-20 20:14:50 +00002394 def tearDown(self):
2395 support.unlink(support.TESTFN)
2396
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002397 def test___all__(self):
2398 for name in self.io.__all__:
2399 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002400 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002401 if name == "open":
2402 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002403 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002404 self.assertTrue(issubclass(obj, Exception), name)
2405 elif not name.startswith("SEEK_"):
2406 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002407
Barry Warsaw40e82462008-11-20 20:14:50 +00002408 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002409 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002410 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002411 f.close()
2412
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002413 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002414 self.assertEquals(f.name, support.TESTFN)
2415 self.assertEquals(f.buffer.name, support.TESTFN)
2416 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2417 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002418 self.assertEquals(f.buffer.mode, "rb")
2419 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002420 f.close()
2421
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002422 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002423 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002424 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2425 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002426
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002427 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002428 self.assertEquals(g.mode, "wb")
2429 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002430 self.assertEquals(g.name, f.fileno())
2431 self.assertEquals(g.raw.name, f.fileno())
2432 f.close()
2433 g.close()
2434
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002435 def test_io_after_close(self):
2436 for kwargs in [
2437 {"mode": "w"},
2438 {"mode": "wb"},
2439 {"mode": "w", "buffering": 1},
2440 {"mode": "w", "buffering": 2},
2441 {"mode": "wb", "buffering": 0},
2442 {"mode": "r"},
2443 {"mode": "rb"},
2444 {"mode": "r", "buffering": 1},
2445 {"mode": "r", "buffering": 2},
2446 {"mode": "rb", "buffering": 0},
2447 {"mode": "w+"},
2448 {"mode": "w+b"},
2449 {"mode": "w+", "buffering": 1},
2450 {"mode": "w+", "buffering": 2},
2451 {"mode": "w+b", "buffering": 0},
2452 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002453 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002454 f.close()
2455 self.assertRaises(ValueError, f.flush)
2456 self.assertRaises(ValueError, f.fileno)
2457 self.assertRaises(ValueError, f.isatty)
2458 self.assertRaises(ValueError, f.__iter__)
2459 if hasattr(f, "peek"):
2460 self.assertRaises(ValueError, f.peek, 1)
2461 self.assertRaises(ValueError, f.read)
2462 if hasattr(f, "read1"):
2463 self.assertRaises(ValueError, f.read1, 1024)
2464 if hasattr(f, "readinto"):
2465 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2466 self.assertRaises(ValueError, f.readline)
2467 self.assertRaises(ValueError, f.readlines)
2468 self.assertRaises(ValueError, f.seek, 0)
2469 self.assertRaises(ValueError, f.tell)
2470 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002471 self.assertRaises(ValueError, f.write,
2472 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002473 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002474 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002475
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002476 def test_blockingioerror(self):
2477 # Various BlockingIOError issues
2478 self.assertRaises(TypeError, self.BlockingIOError)
2479 self.assertRaises(TypeError, self.BlockingIOError, 1)
2480 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2481 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2482 b = self.BlockingIOError(1, "")
2483 self.assertEqual(b.characters_written, 0)
2484 class C(str):
2485 pass
2486 c = C("")
2487 b = self.BlockingIOError(1, c)
2488 c.b = b
2489 b.c = c
2490 wr = weakref.ref(c)
2491 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002492 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002493 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002494
2495 def test_abcs(self):
2496 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002497 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2498 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2499 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2500 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002501
2502 def _check_abc_inheritance(self, abcmodule):
2503 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002504 self.assertIsInstance(f, abcmodule.IOBase)
2505 self.assertIsInstance(f, abcmodule.RawIOBase)
2506 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2507 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002508 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002509 self.assertIsInstance(f, abcmodule.IOBase)
2510 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2511 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2512 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002513 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002514 self.assertIsInstance(f, abcmodule.IOBase)
2515 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2516 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2517 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002518
2519 def test_abc_inheritance(self):
2520 # Test implementations inherit from their respective ABCs
2521 self._check_abc_inheritance(self)
2522
2523 def test_abc_inheritance_official(self):
2524 # Test implementations inherit from the official ABCs of the
2525 # baseline "io" module.
2526 self._check_abc_inheritance(io)
2527
2528class CMiscIOTest(MiscIOTest):
2529 io = io
2530
2531class PyMiscIOTest(MiscIOTest):
2532 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002533
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002534
2535@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2536class SignalsTest(unittest.TestCase):
2537
2538 def setUp(self):
2539 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2540
2541 def tearDown(self):
2542 signal.signal(signal.SIGALRM, self.oldalrm)
2543
2544 def alarm_interrupt(self, sig, frame):
2545 1/0
2546
2547 @unittest.skipUnless(threading, 'Threading required for this test.')
2548 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2549 """Check that a partial write, when it gets interrupted, properly
2550 invokes the signal handler."""
2551 read_results = []
2552 def _read():
2553 s = os.read(r, 1)
2554 read_results.append(s)
2555 t = threading.Thread(target=_read)
2556 t.daemon = True
2557 r, w = os.pipe()
2558 try:
2559 wio = self.io.open(w, **fdopen_kwargs)
2560 t.start()
2561 signal.alarm(1)
2562 # Fill the pipe enough that the write will be blocking.
2563 # It will be interrupted by the timer armed above. Since the
2564 # other thread has read one byte, the low-level write will
2565 # return with a successful (partial) result rather than an EINTR.
2566 # The buffered IO layer must check for pending signal
2567 # handlers, which in this case will invoke alarm_interrupt().
2568 self.assertRaises(ZeroDivisionError,
2569 wio.write, item * (1024 * 1024))
2570 t.join()
2571 # We got one byte, get another one and check that it isn't a
2572 # repeat of the first one.
2573 read_results.append(os.read(r, 1))
2574 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2575 finally:
2576 os.close(w)
2577 os.close(r)
2578 # This is deliberate. If we didn't close the file descriptor
2579 # before closing wio, wio would try to flush its internal
2580 # buffer, and block again.
2581 try:
2582 wio.close()
2583 except IOError as e:
2584 if e.errno != errno.EBADF:
2585 raise
2586
2587 def test_interrupted_write_unbuffered(self):
2588 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2589
2590 def test_interrupted_write_buffered(self):
2591 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2592
2593 def test_interrupted_write_text(self):
2594 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2595
2596class CSignalsTest(SignalsTest):
2597 io = io
2598
2599class PySignalsTest(SignalsTest):
2600 io = pyio
2601
2602
Guido van Rossum28524c72007-02-27 05:47:44 +00002603def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002604 tests = (CIOTest, PyIOTest,
2605 CBufferedReaderTest, PyBufferedReaderTest,
2606 CBufferedWriterTest, PyBufferedWriterTest,
2607 CBufferedRWPairTest, PyBufferedRWPairTest,
2608 CBufferedRandomTest, PyBufferedRandomTest,
2609 StatefulIncrementalDecoderTest,
2610 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2611 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002612 CMiscIOTest, PyMiscIOTest,
2613 CSignalsTest, PySignalsTest,
2614 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002615
2616 # Put the namespaces of the IO module we are testing and some useful mock
2617 # classes in the __dict__ of each test.
2618 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00002619 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002620 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2621 c_io_ns = {name : getattr(io, name) for name in all_members}
2622 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2623 globs = globals()
2624 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2625 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2626 # Avoid turning open into a bound method.
2627 py_io_ns["open"] = pyio.OpenWrapper
2628 for test in tests:
2629 if test.__name__.startswith("C"):
2630 for name, obj in c_io_ns.items():
2631 setattr(test, name, obj)
2632 elif test.__name__.startswith("Py"):
2633 for name, obj in py_io_ns.items():
2634 setattr(test, name, obj)
2635
2636 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002637
2638if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002639 test_main()