blob: dfddfb54a7fff280c940ac365651013069226233 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Georg Brandl1b37e872010-03-14 10:45:50 +000032from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000033from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000034from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000035
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000036import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000037import io # C implementation of io
38import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000039try:
40 import threading
41except ImportError:
42 threading = None
Guido van Rossum28524c72007-02-27 05:47:44 +000043
Guido van Rossuma9e20242007-03-08 00:43:48 +000044
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000045def _default_chunk_size():
46 """Get the default TextIOWrapper chunk size"""
47 with open(__file__, "r", encoding="latin1") as f:
48 return f._CHUNK_SIZE
49
50
Antoine Pitrou328ec742010-09-14 18:37:24 +000051class MockRawIOWithoutRead:
52 """A RawIO implementation without read(), so as to exercise the default
53 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000054
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000055 def __init__(self, read_stack=()):
56 self._read_stack = list(read_stack)
57 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000058 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000059 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000060
Guido van Rossum01a27522007-03-07 01:00:12 +000061 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000062 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000063 return len(b)
64
65 def writable(self):
66 return True
67
Guido van Rossum68bbcd22007-02-27 17:19:33 +000068 def fileno(self):
69 return 42
70
71 def readable(self):
72 return True
73
Guido van Rossum01a27522007-03-07 01:00:12 +000074 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075 return True
76
Guido van Rossum01a27522007-03-07 01:00:12 +000077 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000078 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000079
80 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 return 0 # same comment as above
82
83 def readinto(self, buf):
84 self._reads += 1
85 max_len = len(buf)
86 try:
87 data = self._read_stack[0]
88 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000089 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000090 return 0
91 if data is None:
92 del self._read_stack[0]
93 return None
94 n = len(data)
95 if len(data) <= max_len:
96 del self._read_stack[0]
97 buf[:n] = data
98 return n
99 else:
100 buf[:] = data[:max_len]
101 self._read_stack[0] = data[max_len:]
102 return max_len
103
104 def truncate(self, pos=None):
105 return pos
106
Antoine Pitrou328ec742010-09-14 18:37:24 +0000107class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
108 pass
109
110class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
111 pass
112
113
114class MockRawIO(MockRawIOWithoutRead):
115
116 def read(self, n=None):
117 self._reads += 1
118 try:
119 return self._read_stack.pop(0)
120 except:
121 self._extraneous_reads += 1
122 return b""
123
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000124class CMockRawIO(MockRawIO, io.RawIOBase):
125 pass
126
127class PyMockRawIO(MockRawIO, pyio.RawIOBase):
128 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000129
Guido van Rossuma9e20242007-03-08 00:43:48 +0000130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000131class MisbehavedRawIO(MockRawIO):
132 def write(self, b):
133 return super().write(b) * 2
134
135 def read(self, n=None):
136 return super().read(n) * 2
137
138 def seek(self, pos, whence):
139 return -123
140
141 def tell(self):
142 return -456
143
144 def readinto(self, buf):
145 super().readinto(buf)
146 return len(buf) * 5
147
148class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
149 pass
150
151class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
152 pass
153
154
155class CloseFailureIO(MockRawIO):
156 closed = 0
157
158 def close(self):
159 if not self.closed:
160 self.closed = 1
161 raise IOError
162
163class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
164 pass
165
166class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
167 pass
168
169
170class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000171
172 def __init__(self, data):
173 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000174 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000175
176 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000177 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000178 self.read_history.append(None if res is None else len(res))
179 return res
180
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 def readinto(self, b):
182 res = super().readinto(b)
183 self.read_history.append(res)
184 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000185
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000186class CMockFileIO(MockFileIO, io.BytesIO):
187 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000189class PyMockFileIO(MockFileIO, pyio.BytesIO):
190 pass
191
192
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000193class MockUnseekableIO:
194 def seekable(self):
195 return False
196
197 def seek(self, *args):
198 raise self.UnsupportedOperation("not seekable")
199
200 def tell(self, *args):
201 raise self.UnsupportedOperation("not seekable")
202
203class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
204 UnsupportedOperation = io.UnsupportedOperation
205
206class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
207 UnsupportedOperation = pyio.UnsupportedOperation
208
209
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000210class MockNonBlockWriterIO:
211
212 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000213 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000214 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000215
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216 def pop_written(self):
217 s = b"".join(self._write_stack)
218 self._write_stack[:] = []
219 return s
220
221 def block_on(self, char):
222 """Block when a given char is encountered."""
223 self._blocker_char = char
224
225 def readable(self):
226 return True
227
228 def seekable(self):
229 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000230
Guido van Rossum01a27522007-03-07 01:00:12 +0000231 def writable(self):
232 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000233
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000234 def write(self, b):
235 b = bytes(b)
236 n = -1
237 if self._blocker_char:
238 try:
239 n = b.index(self._blocker_char)
240 except ValueError:
241 pass
242 else:
243 self._blocker_char = None
244 self._write_stack.append(b[:n])
245 raise self.BlockingIOError(0, "test blocking", n)
246 self._write_stack.append(b)
247 return len(b)
248
249class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
250 BlockingIOError = io.BlockingIOError
251
252class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
253 BlockingIOError = pyio.BlockingIOError
254
Guido van Rossuma9e20242007-03-08 00:43:48 +0000255
Guido van Rossum28524c72007-02-27 05:47:44 +0000256class IOTest(unittest.TestCase):
257
Neal Norwitze7789b12008-03-24 06:18:09 +0000258 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000259 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000260
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000261 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000262 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000263
Guido van Rossum28524c72007-02-27 05:47:44 +0000264 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000265 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000266 f.truncate(0)
267 self.assertEqual(f.tell(), 5)
268 f.seek(0)
269
270 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000271 self.assertEqual(f.seek(0), 0)
272 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000273 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000274 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000276 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000277 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000278 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000279 self.assertEqual(f.seek(-1, 2), 13)
280 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000281
Guido van Rossum87429772007-04-10 21:06:59 +0000282 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000283 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000284 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285
Guido van Rossum9b76da62007-04-11 01:09:03 +0000286 def read_ops(self, f, buffered=False):
287 data = f.read(5)
288 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000289 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000290 self.assertEqual(f.readinto(data), 5)
291 self.assertEqual(data, b" worl")
292 self.assertEqual(f.readinto(data), 2)
293 self.assertEqual(len(data), 5)
294 self.assertEqual(data[:2], b"d\n")
295 self.assertEqual(f.seek(0), 0)
296 self.assertEqual(f.read(20), b"hello world\n")
297 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000298 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000299 self.assertEqual(f.seek(-6, 2), 6)
300 self.assertEqual(f.read(5), b"world")
301 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000302 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000303 self.assertEqual(f.seek(-6, 1), 5)
304 self.assertEqual(f.read(5), b" worl")
305 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000306 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000307 if buffered:
308 f.seek(0)
309 self.assertEqual(f.read(), b"hello world\n")
310 f.seek(6)
311 self.assertEqual(f.read(), b"world\n")
312 self.assertEqual(f.read(), b"")
313
Guido van Rossum34d69e52007-04-10 20:08:41 +0000314 LARGE = 2**31
315
Guido van Rossum53807da2007-04-10 19:01:47 +0000316 def large_file_ops(self, f):
317 assert f.readable()
318 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000319 self.assertEqual(f.seek(self.LARGE), self.LARGE)
320 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000321 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000322 self.assertEqual(f.tell(), self.LARGE + 3)
323 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000324 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000325 self.assertEqual(f.tell(), self.LARGE + 2)
326 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000327 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000328 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000329 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
330 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000331 self.assertEqual(f.read(2), b"x")
332
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000333 def test_invalid_operations(self):
334 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000335 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000336 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000337 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000338 self.assertRaises(exc, fp.read)
339 self.assertRaises(exc, fp.readline)
340 with self.open(support.TESTFN, "wb", buffering=0) as fp:
341 self.assertRaises(exc, fp.read)
342 self.assertRaises(exc, fp.readline)
343 with self.open(support.TESTFN, "rb", buffering=0) as fp:
344 self.assertRaises(exc, fp.write, b"blah")
345 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000346 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000347 self.assertRaises(exc, fp.write, b"blah")
348 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000349 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000350 self.assertRaises(exc, fp.write, "blah")
351 self.assertRaises(exc, fp.writelines, ["blah\n"])
352 # Non-zero seeking from current or end pos
353 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
354 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000355
Guido van Rossum28524c72007-02-27 05:47:44 +0000356 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000357 with self.open(support.TESTFN, "wb", buffering=0) as f:
358 self.assertEqual(f.readable(), False)
359 self.assertEqual(f.writable(), True)
360 self.assertEqual(f.seekable(), True)
361 self.write_ops(f)
362 with self.open(support.TESTFN, "rb", buffering=0) as f:
363 self.assertEqual(f.readable(), True)
364 self.assertEqual(f.writable(), False)
365 self.assertEqual(f.seekable(), True)
366 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000367
Guido van Rossum87429772007-04-10 21:06:59 +0000368 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000369 with self.open(support.TESTFN, "wb") as f:
370 self.assertEqual(f.readable(), False)
371 self.assertEqual(f.writable(), True)
372 self.assertEqual(f.seekable(), True)
373 self.write_ops(f)
374 with self.open(support.TESTFN, "rb") as f:
375 self.assertEqual(f.readable(), True)
376 self.assertEqual(f.writable(), False)
377 self.assertEqual(f.seekable(), True)
378 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000379
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000380 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000381 with self.open(support.TESTFN, "wb") as f:
382 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
383 with self.open(support.TESTFN, "rb") as f:
384 self.assertEqual(f.readline(), b"abc\n")
385 self.assertEqual(f.readline(10), b"def\n")
386 self.assertEqual(f.readline(2), b"xy")
387 self.assertEqual(f.readline(4), b"zzy\n")
388 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000389 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000390 self.assertRaises(TypeError, f.readline, 5.3)
391 with self.open(support.TESTFN, "r") as f:
392 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000393
Guido van Rossum28524c72007-02-27 05:47:44 +0000394 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000395 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000396 self.write_ops(f)
397 data = f.getvalue()
398 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000399 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000400 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000401
Guido van Rossum53807da2007-04-10 19:01:47 +0000402 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000403 # On Windows and Mac OSX this test comsumes large resources; It takes
404 # a long time to build the >2GB file and takes >2GB of disk space
405 # therefore the resource must be enabled to run this test.
406 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000407 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000408 print("\nTesting large file ops skipped on %s." % sys.platform,
409 file=sys.stderr)
410 print("It requires %d bytes and a long time." % self.LARGE,
411 file=sys.stderr)
412 print("Use 'regrtest.py -u largefile test_io' to run it.",
413 file=sys.stderr)
414 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000415 with self.open(support.TESTFN, "w+b", 0) as f:
416 self.large_file_ops(f)
417 with self.open(support.TESTFN, "w+b") as f:
418 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000419
420 def test_with_open(self):
421 for bufsize in (0, 1, 100):
422 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000423 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000424 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000425 self.assertEqual(f.closed, True)
426 f = None
427 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000428 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000429 1/0
430 except ZeroDivisionError:
431 self.assertEqual(f.closed, True)
432 else:
433 self.fail("1/0 didn't raise an exception")
434
Antoine Pitrou08838b62009-01-21 00:55:13 +0000435 # issue 5008
436 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000437 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000438 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000439 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000440 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000441 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000442 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000443 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000444 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000445
Guido van Rossum87429772007-04-10 21:06:59 +0000446 def test_destructor(self):
447 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000448 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000449 def __del__(self):
450 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 try:
452 f = super().__del__
453 except AttributeError:
454 pass
455 else:
456 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000457 def close(self):
458 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000459 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000460 def flush(self):
461 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000462 super().flush()
463 f = MyFileIO(support.TESTFN, "wb")
464 f.write(b"xxx")
465 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000466 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000467 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000468 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000469 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000470
471 def _check_base_destructor(self, base):
472 record = []
473 class MyIO(base):
474 def __init__(self):
475 # This exercises the availability of attributes on object
476 # destruction.
477 # (in the C version, close() is called by the tp_dealloc
478 # function, not by __del__)
479 self.on_del = 1
480 self.on_close = 2
481 self.on_flush = 3
482 def __del__(self):
483 record.append(self.on_del)
484 try:
485 f = super().__del__
486 except AttributeError:
487 pass
488 else:
489 f()
490 def close(self):
491 record.append(self.on_close)
492 super().close()
493 def flush(self):
494 record.append(self.on_flush)
495 super().flush()
496 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000497 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000498 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000499 self.assertEqual(record, [1, 2, 3])
500
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000501 def test_IOBase_destructor(self):
502 self._check_base_destructor(self.IOBase)
503
504 def test_RawIOBase_destructor(self):
505 self._check_base_destructor(self.RawIOBase)
506
507 def test_BufferedIOBase_destructor(self):
508 self._check_base_destructor(self.BufferedIOBase)
509
510 def test_TextIOBase_destructor(self):
511 self._check_base_destructor(self.TextIOBase)
512
Guido van Rossum87429772007-04-10 21:06:59 +0000513 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000514 with self.open(support.TESTFN, "wb") as f:
515 f.write(b"xxx")
516 with self.open(support.TESTFN, "rb") as f:
517 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000518
Guido van Rossumd4103952007-04-12 05:44:49 +0000519 def test_array_writes(self):
520 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000521 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000522 with self.open(support.TESTFN, "wb", 0) as f:
523 self.assertEqual(f.write(a), n)
524 with self.open(support.TESTFN, "wb") as f:
525 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000526
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000527 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000528 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000529 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000530
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000531 def test_read_closed(self):
532 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000533 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000534 with self.open(support.TESTFN, "r") as f:
535 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000536 self.assertEqual(file.read(), "egg\n")
537 file.seek(0)
538 file.close()
539 self.assertRaises(ValueError, file.read)
540
541 def test_no_closefd_with_filename(self):
542 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000543 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000544
545 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000546 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000547 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000548 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000549 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000550 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000551 self.assertEqual(file.buffer.raw.closefd, False)
552
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000553 def test_garbage_collection(self):
554 # FileIO objects are collected, and collecting them flushes
555 # all data to disk.
556 f = self.FileIO(support.TESTFN, "wb")
557 f.write(b"abcxxx")
558 f.f = f
559 wr = weakref.ref(f)
560 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000561 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000562 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000563 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000564 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000565
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000566 def test_unbounded_file(self):
567 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
568 zero = "/dev/zero"
569 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000570 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000571 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000572 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000573 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000574 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000575 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000576 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000577 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000578 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000579 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 self.assertRaises(OverflowError, f.read)
581
Antoine Pitrou6be88762010-05-03 16:48:20 +0000582 def test_flush_error_on_close(self):
583 f = self.open(support.TESTFN, "wb", buffering=0)
584 def bad_flush():
585 raise IOError()
586 f.flush = bad_flush
587 self.assertRaises(IOError, f.close) # exception not swallowed
588
589 def test_multi_close(self):
590 f = self.open(support.TESTFN, "wb", buffering=0)
591 f.close()
592 f.close()
593 f.close()
594 self.assertRaises(ValueError, f.flush)
595
Antoine Pitrou328ec742010-09-14 18:37:24 +0000596 def test_RawIOBase_read(self):
597 # Exercise the default RawIOBase.read() implementation (which calls
598 # readinto() internally).
599 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
600 self.assertEqual(rawio.read(2), b"ab")
601 self.assertEqual(rawio.read(2), b"c")
602 self.assertEqual(rawio.read(2), b"d")
603 self.assertEqual(rawio.read(2), None)
604 self.assertEqual(rawio.read(2), b"ef")
605 self.assertEqual(rawio.read(2), b"g")
606 self.assertEqual(rawio.read(2), None)
607 self.assertEqual(rawio.read(2), b"")
608
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000609class CIOTest(IOTest):
610 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000611
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000612class PyIOTest(IOTest):
613 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000614
Guido van Rossuma9e20242007-03-08 00:43:48 +0000615
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000616class CommonBufferedTests:
617 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
618
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000619 def test_detach(self):
620 raw = self.MockRawIO()
621 buf = self.tp(raw)
622 self.assertIs(buf.detach(), raw)
623 self.assertRaises(ValueError, buf.detach)
624
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000625 def test_fileno(self):
626 rawio = self.MockRawIO()
627 bufio = self.tp(rawio)
628
629 self.assertEquals(42, bufio.fileno())
630
631 def test_no_fileno(self):
632 # XXX will we always have fileno() function? If so, kill
633 # this test. Else, write it.
634 pass
635
636 def test_invalid_args(self):
637 rawio = self.MockRawIO()
638 bufio = self.tp(rawio)
639 # Invalid whence
640 self.assertRaises(ValueError, bufio.seek, 0, -1)
641 self.assertRaises(ValueError, bufio.seek, 0, 3)
642
643 def test_override_destructor(self):
644 tp = self.tp
645 record = []
646 class MyBufferedIO(tp):
647 def __del__(self):
648 record.append(1)
649 try:
650 f = super().__del__
651 except AttributeError:
652 pass
653 else:
654 f()
655 def close(self):
656 record.append(2)
657 super().close()
658 def flush(self):
659 record.append(3)
660 super().flush()
661 rawio = self.MockRawIO()
662 bufio = MyBufferedIO(rawio)
663 writable = bufio.writable()
664 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000665 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000666 if writable:
667 self.assertEqual(record, [1, 2, 3])
668 else:
669 self.assertEqual(record, [1, 2])
670
671 def test_context_manager(self):
672 # Test usability as a context manager
673 rawio = self.MockRawIO()
674 bufio = self.tp(rawio)
675 def _with():
676 with bufio:
677 pass
678 _with()
679 # bufio should now be closed, and using it a second time should raise
680 # a ValueError.
681 self.assertRaises(ValueError, _with)
682
683 def test_error_through_destructor(self):
684 # Test that the exception state is not modified by a destructor,
685 # even if close() fails.
686 rawio = self.CloseFailureIO()
687 def f():
688 self.tp(rawio).xyzzy
689 with support.captured_output("stderr") as s:
690 self.assertRaises(AttributeError, f)
691 s = s.getvalue().strip()
692 if s:
693 # The destructor *may* have printed an unraisable error, check it
694 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000695 self.assertTrue(s.startswith("Exception IOError: "), s)
696 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000697
Antoine Pitrou716c4442009-05-23 19:04:03 +0000698 def test_repr(self):
699 raw = self.MockRawIO()
700 b = self.tp(raw)
701 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
702 self.assertEqual(repr(b), "<%s>" % clsname)
703 raw.name = "dummy"
704 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
705 raw.name = b"dummy"
706 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
707
Antoine Pitrou6be88762010-05-03 16:48:20 +0000708 def test_flush_error_on_close(self):
709 raw = self.MockRawIO()
710 def bad_flush():
711 raise IOError()
712 raw.flush = bad_flush
713 b = self.tp(raw)
714 self.assertRaises(IOError, b.close) # exception not swallowed
715
716 def test_multi_close(self):
717 raw = self.MockRawIO()
718 b = self.tp(raw)
719 b.close()
720 b.close()
721 b.close()
722 self.assertRaises(ValueError, b.flush)
723
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000724 def test_unseekable(self):
725 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
726 self.assertRaises(self.UnsupportedOperation, bufio.tell)
727 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
728
Guido van Rossum78892e42007-04-06 17:31:18 +0000729
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000730class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
731 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000732
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000733 def test_constructor(self):
734 rawio = self.MockRawIO([b"abc"])
735 bufio = self.tp(rawio)
736 bufio.__init__(rawio)
737 bufio.__init__(rawio, buffer_size=1024)
738 bufio.__init__(rawio, buffer_size=16)
739 self.assertEquals(b"abc", bufio.read())
740 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
741 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
742 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
743 rawio = self.MockRawIO([b"abc"])
744 bufio.__init__(rawio)
745 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000746
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000747 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000748 for arg in (None, 7):
749 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
750 bufio = self.tp(rawio)
751 self.assertEquals(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000752 # Invalid args
753 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000754
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000755 def test_read1(self):
756 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
757 bufio = self.tp(rawio)
758 self.assertEquals(b"a", bufio.read(1))
759 self.assertEquals(b"b", bufio.read1(1))
760 self.assertEquals(rawio._reads, 1)
761 self.assertEquals(b"c", bufio.read1(100))
762 self.assertEquals(rawio._reads, 1)
763 self.assertEquals(b"d", bufio.read1(100))
764 self.assertEquals(rawio._reads, 2)
765 self.assertEquals(b"efg", bufio.read1(100))
766 self.assertEquals(rawio._reads, 3)
767 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000768 self.assertEquals(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000769 # Invalid args
770 self.assertRaises(ValueError, bufio.read1, -1)
771
772 def test_readinto(self):
773 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
774 bufio = self.tp(rawio)
775 b = bytearray(2)
776 self.assertEquals(bufio.readinto(b), 2)
777 self.assertEquals(b, b"ab")
778 self.assertEquals(bufio.readinto(b), 2)
779 self.assertEquals(b, b"cd")
780 self.assertEquals(bufio.readinto(b), 2)
781 self.assertEquals(b, b"ef")
782 self.assertEquals(bufio.readinto(b), 1)
783 self.assertEquals(b, b"gf")
784 self.assertEquals(bufio.readinto(b), 0)
785 self.assertEquals(b, b"gf")
786
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000787 def test_readlines(self):
788 def bufio():
789 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
790 return self.tp(rawio)
791 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
792 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
793 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
794
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000795 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000796 data = b"abcdefghi"
797 dlen = len(data)
798
799 tests = [
800 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
801 [ 100, [ 3, 3, 3], [ dlen ] ],
802 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
803 ]
804
805 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000806 rawio = self.MockFileIO(data)
807 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000808 pos = 0
809 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000810 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000811 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000812 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000813 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000814
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000815 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000816 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000817 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
818 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000819
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000820 self.assertEquals(b"abcd", bufio.read(6))
821 self.assertEquals(b"e", bufio.read(1))
822 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000823 self.assertEquals(b"", bufio.peek(1))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000824 self.assertTrue(None is bufio.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000825 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000826
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000827 def test_read_past_eof(self):
828 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
829 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000830
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000831 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000832
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000833 def test_read_all(self):
834 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
835 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000836
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000837 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000838
Victor Stinner45df8202010-04-28 22:31:17 +0000839 @unittest.skipUnless(threading, 'Threading required for this test.')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000840 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000841 try:
842 # Write out many bytes with exactly the same number of 0's,
843 # 1's... 255's. This will help us check that concurrent reading
844 # doesn't duplicate or forget contents.
845 N = 1000
846 l = list(range(256)) * N
847 random.shuffle(l)
848 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000849 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000850 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000851 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000852 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000853 errors = []
854 results = []
855 def f():
856 try:
857 # Intra-buffer read then buffer-flushing read
858 for n in cycle([1, 19]):
859 s = bufio.read(n)
860 if not s:
861 break
862 # list.append() is atomic
863 results.append(s)
864 except Exception as e:
865 errors.append(e)
866 raise
867 threads = [threading.Thread(target=f) for x in range(20)]
868 for t in threads:
869 t.start()
870 time.sleep(0.02) # yield
871 for t in threads:
872 t.join()
873 self.assertFalse(errors,
874 "the following exceptions were caught: %r" % errors)
875 s = b''.join(results)
876 for i in range(256):
877 c = bytes(bytearray([i]))
878 self.assertEqual(s.count(c), N)
879 finally:
880 support.unlink(support.TESTFN)
881
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000882 def test_misbehaved_io(self):
883 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
884 bufio = self.tp(rawio)
885 self.assertRaises(IOError, bufio.seek, 0)
886 self.assertRaises(IOError, bufio.tell)
887
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000888 def test_no_extraneous_read(self):
889 # Issue #9550; when the raw IO object has satisfied the read request,
890 # we should not issue any additional reads, otherwise it may block
891 # (e.g. socket).
892 bufsize = 16
893 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
894 rawio = self.MockRawIO([b"x" * n])
895 bufio = self.tp(rawio, bufsize)
896 self.assertEqual(bufio.read(n), b"x" * n)
897 # Simple case: one raw read is enough to satisfy the request.
898 self.assertEqual(rawio._extraneous_reads, 0,
899 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
900 # A more complex case where two raw reads are needed to satisfy
901 # the request.
902 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
903 bufio = self.tp(rawio, bufsize)
904 self.assertEqual(bufio.read(n), b"x" * n)
905 self.assertEqual(rawio._extraneous_reads, 0,
906 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
907
908
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000909class CBufferedReaderTest(BufferedReaderTest):
910 tp = io.BufferedReader
911
912 def test_constructor(self):
913 BufferedReaderTest.test_constructor(self)
914 # The allocation can succeed on 32-bit builds, e.g. with more
915 # than 2GB RAM and a 64-bit kernel.
916 if sys.maxsize > 0x7FFFFFFF:
917 rawio = self.MockRawIO()
918 bufio = self.tp(rawio)
919 self.assertRaises((OverflowError, MemoryError, ValueError),
920 bufio.__init__, rawio, sys.maxsize)
921
922 def test_initialization(self):
923 rawio = self.MockRawIO([b"abc"])
924 bufio = self.tp(rawio)
925 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
926 self.assertRaises(ValueError, bufio.read)
927 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
928 self.assertRaises(ValueError, bufio.read)
929 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
930 self.assertRaises(ValueError, bufio.read)
931
932 def test_misbehaved_io_read(self):
933 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
934 bufio = self.tp(rawio)
935 # _pyio.BufferedReader seems to implement reading different, so that
936 # checking this is not so easy.
937 self.assertRaises(IOError, bufio.read, 10)
938
939 def test_garbage_collection(self):
940 # C BufferedReader objects are collected.
941 # The Python version has __del__, so it ends into gc.garbage instead
942 rawio = self.FileIO(support.TESTFN, "w+b")
943 f = self.tp(rawio)
944 f.f = f
945 wr = weakref.ref(f)
946 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000947 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000948 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000949
950class PyBufferedReaderTest(BufferedReaderTest):
951 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000952
Guido van Rossuma9e20242007-03-08 00:43:48 +0000953
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000954class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
955 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000956
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000957 def test_constructor(self):
958 rawio = self.MockRawIO()
959 bufio = self.tp(rawio)
960 bufio.__init__(rawio)
961 bufio.__init__(rawio, buffer_size=1024)
962 bufio.__init__(rawio, buffer_size=16)
963 self.assertEquals(3, bufio.write(b"abc"))
964 bufio.flush()
965 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
966 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
967 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
968 bufio.__init__(rawio)
969 self.assertEquals(3, bufio.write(b"ghi"))
970 bufio.flush()
971 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
972
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000973 def test_detach_flush(self):
974 raw = self.MockRawIO()
975 buf = self.tp(raw)
976 buf.write(b"howdy!")
977 self.assertFalse(raw._write_stack)
978 buf.detach()
979 self.assertEqual(raw._write_stack, [b"howdy!"])
980
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000981 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000982 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000983 writer = self.MockRawIO()
984 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000985 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000986 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000987
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000988 def test_write_overflow(self):
989 writer = self.MockRawIO()
990 bufio = self.tp(writer, 8)
991 contents = b"abcdefghijklmnop"
992 for n in range(0, len(contents), 3):
993 bufio.write(contents[n:n+3])
994 flushed = b"".join(writer._write_stack)
995 # At least (total - 8) bytes were implicitly flushed, perhaps more
996 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000997 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000998
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000999 def check_writes(self, intermediate_func):
1000 # Lots of writes, test the flushed output is as expected.
1001 contents = bytes(range(256)) * 1000
1002 n = 0
1003 writer = self.MockRawIO()
1004 bufio = self.tp(writer, 13)
1005 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1006 def gen_sizes():
1007 for size in count(1):
1008 for i in range(15):
1009 yield size
1010 sizes = gen_sizes()
1011 while n < len(contents):
1012 size = min(next(sizes), len(contents) - n)
1013 self.assertEquals(bufio.write(contents[n:n+size]), size)
1014 intermediate_func(bufio)
1015 n += size
1016 bufio.flush()
1017 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001018
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001019 def test_writes(self):
1020 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001021
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001022 def test_writes_and_flushes(self):
1023 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001024
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001025 def test_writes_and_seeks(self):
1026 def _seekabs(bufio):
1027 pos = bufio.tell()
1028 bufio.seek(pos + 1, 0)
1029 bufio.seek(pos - 1, 0)
1030 bufio.seek(pos, 0)
1031 self.check_writes(_seekabs)
1032 def _seekrel(bufio):
1033 pos = bufio.seek(0, 1)
1034 bufio.seek(+1, 1)
1035 bufio.seek(-1, 1)
1036 bufio.seek(pos, 0)
1037 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001038
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001039 def test_writes_and_truncates(self):
1040 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001041
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001042 def test_write_non_blocking(self):
1043 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001044 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001045
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001046 self.assertEquals(bufio.write(b"abcd"), 4)
1047 self.assertEquals(bufio.write(b"efghi"), 5)
1048 # 1 byte will be written, the rest will be buffered
1049 raw.block_on(b"k")
1050 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001052 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1053 raw.block_on(b"0")
1054 try:
1055 bufio.write(b"opqrwxyz0123456789")
1056 except self.BlockingIOError as e:
1057 written = e.characters_written
1058 else:
1059 self.fail("BlockingIOError should have been raised")
1060 self.assertEquals(written, 16)
1061 self.assertEquals(raw.pop_written(),
1062 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001063
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001064 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
1065 s = raw.pop_written()
1066 # Previously buffered bytes were flushed
1067 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001068
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001069 def test_write_and_rewind(self):
1070 raw = io.BytesIO()
1071 bufio = self.tp(raw, 4)
1072 self.assertEqual(bufio.write(b"abcdef"), 6)
1073 self.assertEqual(bufio.tell(), 6)
1074 bufio.seek(0, 0)
1075 self.assertEqual(bufio.write(b"XY"), 2)
1076 bufio.seek(6, 0)
1077 self.assertEqual(raw.getvalue(), b"XYcdef")
1078 self.assertEqual(bufio.write(b"123456"), 6)
1079 bufio.flush()
1080 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001081
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001082 def test_flush(self):
1083 writer = self.MockRawIO()
1084 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001085 bufio.write(b"abc")
1086 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001087 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001088
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001089 def test_destructor(self):
1090 writer = self.MockRawIO()
1091 bufio = self.tp(writer, 8)
1092 bufio.write(b"abc")
1093 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001094 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001095 self.assertEquals(b"abc", writer._write_stack[0])
1096
1097 def test_truncate(self):
1098 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001099 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001100 bufio = self.tp(raw, 8)
1101 bufio.write(b"abcdef")
1102 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001103 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001104 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001105 self.assertEqual(f.read(), b"abc")
1106
Victor Stinner45df8202010-04-28 22:31:17 +00001107 @unittest.skipUnless(threading, 'Threading required for this test.')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001108 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001109 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001110 # Write out many bytes from many threads and test they were
1111 # all flushed.
1112 N = 1000
1113 contents = bytes(range(256)) * N
1114 sizes = cycle([1, 19])
1115 n = 0
1116 queue = deque()
1117 while n < len(contents):
1118 size = next(sizes)
1119 queue.append(contents[n:n+size])
1120 n += size
1121 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001122 # We use a real file object because it allows us to
1123 # exercise situations where the GIL is released before
1124 # writing the buffer to the raw streams. This is in addition
1125 # to concurrency issues due to switching threads in the middle
1126 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001127 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001128 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001129 errors = []
1130 def f():
1131 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001132 while True:
1133 try:
1134 s = queue.popleft()
1135 except IndexError:
1136 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001137 bufio.write(s)
1138 except Exception as e:
1139 errors.append(e)
1140 raise
1141 threads = [threading.Thread(target=f) for x in range(20)]
1142 for t in threads:
1143 t.start()
1144 time.sleep(0.02) # yield
1145 for t in threads:
1146 t.join()
1147 self.assertFalse(errors,
1148 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001149 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001150 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001151 s = f.read()
1152 for i in range(256):
1153 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001154 finally:
1155 support.unlink(support.TESTFN)
1156
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001157 def test_misbehaved_io(self):
1158 rawio = self.MisbehavedRawIO()
1159 bufio = self.tp(rawio, 5)
1160 self.assertRaises(IOError, bufio.seek, 0)
1161 self.assertRaises(IOError, bufio.tell)
1162 self.assertRaises(IOError, bufio.write, b"abcdef")
1163
Benjamin Peterson59406a92009-03-26 17:10:29 +00001164 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001165 with support.check_warnings(("max_buffer_size is deprecated",
1166 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001167 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001168
1169
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001170class CBufferedWriterTest(BufferedWriterTest):
1171 tp = io.BufferedWriter
1172
1173 def test_constructor(self):
1174 BufferedWriterTest.test_constructor(self)
1175 # The allocation can succeed on 32-bit builds, e.g. with more
1176 # than 2GB RAM and a 64-bit kernel.
1177 if sys.maxsize > 0x7FFFFFFF:
1178 rawio = self.MockRawIO()
1179 bufio = self.tp(rawio)
1180 self.assertRaises((OverflowError, MemoryError, ValueError),
1181 bufio.__init__, rawio, sys.maxsize)
1182
1183 def test_initialization(self):
1184 rawio = self.MockRawIO()
1185 bufio = self.tp(rawio)
1186 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1187 self.assertRaises(ValueError, bufio.write, b"def")
1188 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1189 self.assertRaises(ValueError, bufio.write, b"def")
1190 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1191 self.assertRaises(ValueError, bufio.write, b"def")
1192
1193 def test_garbage_collection(self):
1194 # C BufferedWriter objects are collected, and collecting them flushes
1195 # all data to disk.
1196 # The Python version has __del__, so it ends into gc.garbage instead
1197 rawio = self.FileIO(support.TESTFN, "w+b")
1198 f = self.tp(rawio)
1199 f.write(b"123xxx")
1200 f.x = f
1201 wr = weakref.ref(f)
1202 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001203 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001204 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001205 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001206 self.assertEqual(f.read(), b"123xxx")
1207
1208
1209class PyBufferedWriterTest(BufferedWriterTest):
1210 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001211
Guido van Rossum01a27522007-03-07 01:00:12 +00001212class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001213
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001214 def test_constructor(self):
1215 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001216 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001217
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001218 def test_detach(self):
1219 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1220 self.assertRaises(self.UnsupportedOperation, pair.detach)
1221
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001222 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001223 with support.check_warnings(("max_buffer_size is deprecated",
1224 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001225 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001226
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001227 def test_constructor_with_not_readable(self):
1228 class NotReadable(MockRawIO):
1229 def readable(self):
1230 return False
1231
1232 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1233
1234 def test_constructor_with_not_writeable(self):
1235 class NotWriteable(MockRawIO):
1236 def writable(self):
1237 return False
1238
1239 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1240
1241 def test_read(self):
1242 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1243
1244 self.assertEqual(pair.read(3), b"abc")
1245 self.assertEqual(pair.read(1), b"d")
1246 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001247 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1248 self.assertEqual(pair.read(None), b"abc")
1249
1250 def test_readlines(self):
1251 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1252 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1253 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1254 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001255
1256 def test_read1(self):
1257 # .read1() is delegated to the underlying reader object, so this test
1258 # can be shallow.
1259 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1260
1261 self.assertEqual(pair.read1(3), b"abc")
1262
1263 def test_readinto(self):
1264 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1265
1266 data = bytearray(5)
1267 self.assertEqual(pair.readinto(data), 5)
1268 self.assertEqual(data, b"abcde")
1269
1270 def test_write(self):
1271 w = self.MockRawIO()
1272 pair = self.tp(self.MockRawIO(), w)
1273
1274 pair.write(b"abc")
1275 pair.flush()
1276 pair.write(b"def")
1277 pair.flush()
1278 self.assertEqual(w._write_stack, [b"abc", b"def"])
1279
1280 def test_peek(self):
1281 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1282
1283 self.assertTrue(pair.peek(3).startswith(b"abc"))
1284 self.assertEqual(pair.read(3), b"abc")
1285
1286 def test_readable(self):
1287 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1288 self.assertTrue(pair.readable())
1289
1290 def test_writeable(self):
1291 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1292 self.assertTrue(pair.writable())
1293
1294 def test_seekable(self):
1295 # BufferedRWPairs are never seekable, even if their readers and writers
1296 # are.
1297 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1298 self.assertFalse(pair.seekable())
1299
1300 # .flush() is delegated to the underlying writer object and has been
1301 # tested in the test_write method.
1302
1303 def test_close_and_closed(self):
1304 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1305 self.assertFalse(pair.closed)
1306 pair.close()
1307 self.assertTrue(pair.closed)
1308
1309 def test_isatty(self):
1310 class SelectableIsAtty(MockRawIO):
1311 def __init__(self, isatty):
1312 MockRawIO.__init__(self)
1313 self._isatty = isatty
1314
1315 def isatty(self):
1316 return self._isatty
1317
1318 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1319 self.assertFalse(pair.isatty())
1320
1321 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1322 self.assertTrue(pair.isatty())
1323
1324 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1325 self.assertTrue(pair.isatty())
1326
1327 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1328 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001329
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001330class CBufferedRWPairTest(BufferedRWPairTest):
1331 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001332
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001333class PyBufferedRWPairTest(BufferedRWPairTest):
1334 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001335
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001336
1337class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1338 read_mode = "rb+"
1339 write_mode = "wb+"
1340
1341 def test_constructor(self):
1342 BufferedReaderTest.test_constructor(self)
1343 BufferedWriterTest.test_constructor(self)
1344
1345 def test_read_and_write(self):
1346 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001347 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001348
1349 self.assertEqual(b"as", rw.read(2))
1350 rw.write(b"ddd")
1351 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001352 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001353 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001354 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001355
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001356 def test_seek_and_tell(self):
1357 raw = self.BytesIO(b"asdfghjkl")
1358 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001359
1360 self.assertEquals(b"as", rw.read(2))
1361 self.assertEquals(2, rw.tell())
1362 rw.seek(0, 0)
1363 self.assertEquals(b"asdf", rw.read(4))
1364
1365 rw.write(b"asdf")
1366 rw.seek(0, 0)
1367 self.assertEquals(b"asdfasdfl", rw.read())
1368 self.assertEquals(9, rw.tell())
1369 rw.seek(-4, 2)
1370 self.assertEquals(5, rw.tell())
1371 rw.seek(2, 1)
1372 self.assertEquals(7, rw.tell())
1373 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001374 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001375
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001376 def check_flush_and_read(self, read_func):
1377 raw = self.BytesIO(b"abcdefghi")
1378 bufio = self.tp(raw)
1379
1380 self.assertEquals(b"ab", read_func(bufio, 2))
1381 bufio.write(b"12")
1382 self.assertEquals(b"ef", read_func(bufio, 2))
1383 self.assertEquals(6, bufio.tell())
1384 bufio.flush()
1385 self.assertEquals(6, bufio.tell())
1386 self.assertEquals(b"ghi", read_func(bufio))
1387 raw.seek(0, 0)
1388 raw.write(b"XYZ")
1389 # flush() resets the read buffer
1390 bufio.flush()
1391 bufio.seek(0, 0)
1392 self.assertEquals(b"XYZ", read_func(bufio, 3))
1393
1394 def test_flush_and_read(self):
1395 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1396
1397 def test_flush_and_readinto(self):
1398 def _readinto(bufio, n=-1):
1399 b = bytearray(n if n >= 0 else 9999)
1400 n = bufio.readinto(b)
1401 return bytes(b[:n])
1402 self.check_flush_and_read(_readinto)
1403
1404 def test_flush_and_peek(self):
1405 def _peek(bufio, n=-1):
1406 # This relies on the fact that the buffer can contain the whole
1407 # raw stream, otherwise peek() can return less.
1408 b = bufio.peek(n)
1409 if n != -1:
1410 b = b[:n]
1411 bufio.seek(len(b), 1)
1412 return b
1413 self.check_flush_and_read(_peek)
1414
1415 def test_flush_and_write(self):
1416 raw = self.BytesIO(b"abcdefghi")
1417 bufio = self.tp(raw)
1418
1419 bufio.write(b"123")
1420 bufio.flush()
1421 bufio.write(b"45")
1422 bufio.flush()
1423 bufio.seek(0, 0)
1424 self.assertEquals(b"12345fghi", raw.getvalue())
1425 self.assertEquals(b"12345fghi", bufio.read())
1426
1427 def test_threads(self):
1428 BufferedReaderTest.test_threads(self)
1429 BufferedWriterTest.test_threads(self)
1430
1431 def test_writes_and_peek(self):
1432 def _peek(bufio):
1433 bufio.peek(1)
1434 self.check_writes(_peek)
1435 def _peek(bufio):
1436 pos = bufio.tell()
1437 bufio.seek(-1, 1)
1438 bufio.peek(1)
1439 bufio.seek(pos, 0)
1440 self.check_writes(_peek)
1441
1442 def test_writes_and_reads(self):
1443 def _read(bufio):
1444 bufio.seek(-1, 1)
1445 bufio.read(1)
1446 self.check_writes(_read)
1447
1448 def test_writes_and_read1s(self):
1449 def _read1(bufio):
1450 bufio.seek(-1, 1)
1451 bufio.read1(1)
1452 self.check_writes(_read1)
1453
1454 def test_writes_and_readintos(self):
1455 def _read(bufio):
1456 bufio.seek(-1, 1)
1457 bufio.readinto(bytearray(1))
1458 self.check_writes(_read)
1459
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001460 def test_write_after_readahead(self):
1461 # Issue #6629: writing after the buffer was filled by readahead should
1462 # first rewind the raw stream.
1463 for overwrite_size in [1, 5]:
1464 raw = self.BytesIO(b"A" * 10)
1465 bufio = self.tp(raw, 4)
1466 # Trigger readahead
1467 self.assertEqual(bufio.read(1), b"A")
1468 self.assertEqual(bufio.tell(), 1)
1469 # Overwriting should rewind the raw stream if it needs so
1470 bufio.write(b"B" * overwrite_size)
1471 self.assertEqual(bufio.tell(), overwrite_size + 1)
1472 # If the write size was smaller than the buffer size, flush() and
1473 # check that rewind happens.
1474 bufio.flush()
1475 self.assertEqual(bufio.tell(), overwrite_size + 1)
1476 s = raw.getvalue()
1477 self.assertEqual(s,
1478 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1479
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001480 def test_truncate_after_read_or_write(self):
1481 raw = self.BytesIO(b"A" * 10)
1482 bufio = self.tp(raw, 100)
1483 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1484 self.assertEqual(bufio.truncate(), 2)
1485 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1486 self.assertEqual(bufio.truncate(), 4)
1487
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001488 def test_misbehaved_io(self):
1489 BufferedReaderTest.test_misbehaved_io(self)
1490 BufferedWriterTest.test_misbehaved_io(self)
1491
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001492 # You can't construct a BufferedRandom over a non-seekable stream.
1493 test_unseekable = None
1494
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001495class CBufferedRandomTest(BufferedRandomTest):
1496 tp = io.BufferedRandom
1497
1498 def test_constructor(self):
1499 BufferedRandomTest.test_constructor(self)
1500 # The allocation can succeed on 32-bit builds, e.g. with more
1501 # than 2GB RAM and a 64-bit kernel.
1502 if sys.maxsize > 0x7FFFFFFF:
1503 rawio = self.MockRawIO()
1504 bufio = self.tp(rawio)
1505 self.assertRaises((OverflowError, MemoryError, ValueError),
1506 bufio.__init__, rawio, sys.maxsize)
1507
1508 def test_garbage_collection(self):
1509 CBufferedReaderTest.test_garbage_collection(self)
1510 CBufferedWriterTest.test_garbage_collection(self)
1511
1512class PyBufferedRandomTest(BufferedRandomTest):
1513 tp = pyio.BufferedRandom
1514
1515
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001516# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1517# properties:
1518# - A single output character can correspond to many bytes of input.
1519# - The number of input bytes to complete the character can be
1520# undetermined until the last input byte is received.
1521# - The number of input bytes can vary depending on previous input.
1522# - A single input byte can correspond to many characters of output.
1523# - The number of output characters can be undetermined until the
1524# last input byte is received.
1525# - The number of output characters can vary depending on previous input.
1526
1527class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1528 """
1529 For testing seek/tell behavior with a stateful, buffering decoder.
1530
1531 Input is a sequence of words. Words may be fixed-length (length set
1532 by input) or variable-length (period-terminated). In variable-length
1533 mode, extra periods are ignored. Possible words are:
1534 - 'i' followed by a number sets the input length, I (maximum 99).
1535 When I is set to 0, words are space-terminated.
1536 - 'o' followed by a number sets the output length, O (maximum 99).
1537 - Any other word is converted into a word followed by a period on
1538 the output. The output word consists of the input word truncated
1539 or padded out with hyphens to make its length equal to O. If O
1540 is 0, the word is output verbatim without truncating or padding.
1541 I and O are initially set to 1. When I changes, any buffered input is
1542 re-scanned according to the new I. EOF also terminates the last word.
1543 """
1544
1545 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001546 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001547 self.reset()
1548
1549 def __repr__(self):
1550 return '<SID %x>' % id(self)
1551
1552 def reset(self):
1553 self.i = 1
1554 self.o = 1
1555 self.buffer = bytearray()
1556
1557 def getstate(self):
1558 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1559 return bytes(self.buffer), i*100 + o
1560
1561 def setstate(self, state):
1562 buffer, io = state
1563 self.buffer = bytearray(buffer)
1564 i, o = divmod(io, 100)
1565 self.i, self.o = i ^ 1, o ^ 1
1566
1567 def decode(self, input, final=False):
1568 output = ''
1569 for b in input:
1570 if self.i == 0: # variable-length, terminated with period
1571 if b == ord('.'):
1572 if self.buffer:
1573 output += self.process_word()
1574 else:
1575 self.buffer.append(b)
1576 else: # fixed-length, terminate after self.i bytes
1577 self.buffer.append(b)
1578 if len(self.buffer) == self.i:
1579 output += self.process_word()
1580 if final and self.buffer: # EOF terminates the last word
1581 output += self.process_word()
1582 return output
1583
1584 def process_word(self):
1585 output = ''
1586 if self.buffer[0] == ord('i'):
1587 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1588 elif self.buffer[0] == ord('o'):
1589 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1590 else:
1591 output = self.buffer.decode('ascii')
1592 if len(output) < self.o:
1593 output += '-'*self.o # pad out with hyphens
1594 if self.o:
1595 output = output[:self.o] # truncate to output length
1596 output += '.'
1597 self.buffer = bytearray()
1598 return output
1599
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001600 codecEnabled = False
1601
1602 @classmethod
1603 def lookupTestDecoder(cls, name):
1604 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001605 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001606 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001607 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001608 incrementalencoder=None,
1609 streamreader=None, streamwriter=None,
1610 incrementaldecoder=cls)
1611
1612# Register the previous decoder for testing.
1613# Disabled by default, tests will enable it.
1614codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1615
1616
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001617class StatefulIncrementalDecoderTest(unittest.TestCase):
1618 """
1619 Make sure the StatefulIncrementalDecoder actually works.
1620 """
1621
1622 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001623 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001624 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001625 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001626 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001627 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001628 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001629 # I=0, O=6 (variable-length input, fixed-length output)
1630 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1631 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001632 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001633 # I=6, O=3 (fixed-length input > fixed-length output)
1634 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1635 # I=0, then 3; O=29, then 15 (with longer output)
1636 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1637 'a----------------------------.' +
1638 'b----------------------------.' +
1639 'cde--------------------------.' +
1640 'abcdefghijabcde.' +
1641 'a.b------------.' +
1642 '.c.------------.' +
1643 'd.e------------.' +
1644 'k--------------.' +
1645 'l--------------.' +
1646 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001647 ]
1648
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001649 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001650 # Try a few one-shot test cases.
1651 for input, eof, output in self.test_cases:
1652 d = StatefulIncrementalDecoder()
1653 self.assertEquals(d.decode(input, eof), output)
1654
1655 # Also test an unfinished decode, followed by forcing EOF.
1656 d = StatefulIncrementalDecoder()
1657 self.assertEquals(d.decode(b'oiabcd'), '')
1658 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001659
1660class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001661
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001662 def setUp(self):
1663 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1664 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001665 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001666
Guido van Rossumd0712812007-04-11 16:32:43 +00001667 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001668 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001669
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001670 def test_constructor(self):
1671 r = self.BytesIO(b"\xc3\xa9\n\n")
1672 b = self.BufferedReader(r, 1000)
1673 t = self.TextIOWrapper(b)
1674 t.__init__(b, encoding="latin1", newline="\r\n")
1675 self.assertEquals(t.encoding, "latin1")
1676 self.assertEquals(t.line_buffering, False)
1677 t.__init__(b, encoding="utf8", line_buffering=True)
1678 self.assertEquals(t.encoding, "utf8")
1679 self.assertEquals(t.line_buffering, True)
1680 self.assertEquals("\xe9\n", t.readline())
1681 self.assertRaises(TypeError, t.__init__, b, newline=42)
1682 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1683
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001684 def test_detach(self):
1685 r = self.BytesIO()
1686 b = self.BufferedWriter(r)
1687 t = self.TextIOWrapper(b)
1688 self.assertIs(t.detach(), b)
1689
1690 t = self.TextIOWrapper(b, encoding="ascii")
1691 t.write("howdy")
1692 self.assertFalse(r.getvalue())
1693 t.detach()
1694 self.assertEqual(r.getvalue(), b"howdy")
1695 self.assertRaises(ValueError, t.detach)
1696
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001697 def test_repr(self):
1698 raw = self.BytesIO("hello".encode("utf-8"))
1699 b = self.BufferedReader(raw)
1700 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001701 modname = self.TextIOWrapper.__module__
1702 self.assertEqual(repr(t),
1703 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1704 raw.name = "dummy"
1705 self.assertEqual(repr(t),
1706 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1707 raw.name = b"dummy"
1708 self.assertEqual(repr(t),
1709 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001710
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001711 def test_line_buffering(self):
1712 r = self.BytesIO()
1713 b = self.BufferedWriter(r, 1000)
1714 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001715 t.write("X")
1716 self.assertEquals(r.getvalue(), b"") # No flush happened
1717 t.write("Y\nZ")
1718 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1719 t.write("A\rB")
1720 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1721
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001722 def test_encoding(self):
1723 # Check the encoding attribute is always set, and valid
1724 b = self.BytesIO()
1725 t = self.TextIOWrapper(b, encoding="utf8")
1726 self.assertEqual(t.encoding, "utf8")
1727 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001728 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001729 codecs.lookup(t.encoding)
1730
1731 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001732 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001733 b = self.BytesIO(b"abc\n\xff\n")
1734 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001735 self.assertRaises(UnicodeError, t.read)
1736 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001737 b = self.BytesIO(b"abc\n\xff\n")
1738 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001739 self.assertRaises(UnicodeError, t.read)
1740 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001741 b = self.BytesIO(b"abc\n\xff\n")
1742 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001743 self.assertEquals(t.read(), "abc\n\n")
1744 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001745 b = self.BytesIO(b"abc\n\xff\n")
1746 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001747 self.assertEquals(t.read(), "abc\n\ufffd\n")
1748
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001749 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001750 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001751 b = self.BytesIO()
1752 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001753 self.assertRaises(UnicodeError, t.write, "\xff")
1754 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001755 b = self.BytesIO()
1756 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001757 self.assertRaises(UnicodeError, t.write, "\xff")
1758 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001759 b = self.BytesIO()
1760 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001761 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001762 t.write("abc\xffdef\n")
1763 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001764 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001765 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001766 b = self.BytesIO()
1767 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001768 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001769 t.write("abc\xffdef\n")
1770 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001771 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001772
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001773 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001774 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1775
1776 tests = [
1777 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001778 [ '', input_lines ],
1779 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1780 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1781 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001782 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001783 encodings = (
1784 'utf-8', 'latin-1',
1785 'utf-16', 'utf-16-le', 'utf-16-be',
1786 'utf-32', 'utf-32-le', 'utf-32-be',
1787 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001788
Guido van Rossum8358db22007-08-18 21:39:55 +00001789 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001790 # character in TextIOWrapper._pending_line.
1791 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001792 # XXX: str.encode() should return bytes
1793 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001794 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001795 for bufsize in range(1, 10):
1796 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001797 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1798 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001799 encoding=encoding)
1800 if do_reads:
1801 got_lines = []
1802 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001803 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001804 if c2 == '':
1805 break
1806 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001807 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001808 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001809 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001810
1811 for got_line, exp_line in zip(got_lines, exp_lines):
1812 self.assertEquals(got_line, exp_line)
1813 self.assertEquals(len(got_lines), len(exp_lines))
1814
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001815 def test_newlines_input(self):
1816 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001817 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1818 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001819 (None, normalized.decode("ascii").splitlines(True)),
1820 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001821 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1822 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1823 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001824 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001825 buf = self.BytesIO(testdata)
1826 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001827 self.assertEquals(txt.readlines(), expected)
1828 txt.seek(0)
1829 self.assertEquals(txt.read(), "".join(expected))
1830
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001831 def test_newlines_output(self):
1832 testdict = {
1833 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1834 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1835 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1836 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1837 }
1838 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1839 for newline, expected in tests:
1840 buf = self.BytesIO()
1841 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1842 txt.write("AAA\nB")
1843 txt.write("BB\nCCC\n")
1844 txt.write("X\rY\r\nZ")
1845 txt.flush()
1846 self.assertEquals(buf.closed, False)
1847 self.assertEquals(buf.getvalue(), expected)
1848
1849 def test_destructor(self):
1850 l = []
1851 base = self.BytesIO
1852 class MyBytesIO(base):
1853 def close(self):
1854 l.append(self.getvalue())
1855 base.close(self)
1856 b = MyBytesIO()
1857 t = self.TextIOWrapper(b, encoding="ascii")
1858 t.write("abc")
1859 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001860 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001861 self.assertEquals([b"abc"], l)
1862
1863 def test_override_destructor(self):
1864 record = []
1865 class MyTextIO(self.TextIOWrapper):
1866 def __del__(self):
1867 record.append(1)
1868 try:
1869 f = super().__del__
1870 except AttributeError:
1871 pass
1872 else:
1873 f()
1874 def close(self):
1875 record.append(2)
1876 super().close()
1877 def flush(self):
1878 record.append(3)
1879 super().flush()
1880 b = self.BytesIO()
1881 t = MyTextIO(b, encoding="ascii")
1882 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001883 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001884 self.assertEqual(record, [1, 2, 3])
1885
1886 def test_error_through_destructor(self):
1887 # Test that the exception state is not modified by a destructor,
1888 # even if close() fails.
1889 rawio = self.CloseFailureIO()
1890 def f():
1891 self.TextIOWrapper(rawio).xyzzy
1892 with support.captured_output("stderr") as s:
1893 self.assertRaises(AttributeError, f)
1894 s = s.getvalue().strip()
1895 if s:
1896 # The destructor *may* have printed an unraisable error, check it
1897 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001898 self.assertTrue(s.startswith("Exception IOError: "), s)
1899 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001900
Guido van Rossum9b76da62007-04-11 01:09:03 +00001901 # Systematic tests of the text I/O API
1902
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001903 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001904 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1905 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001906 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001907 f._CHUNK_SIZE = chunksize
1908 self.assertEquals(f.write("abc"), 3)
1909 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001910 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001911 f._CHUNK_SIZE = chunksize
1912 self.assertEquals(f.tell(), 0)
1913 self.assertEquals(f.read(), "abc")
1914 cookie = f.tell()
1915 self.assertEquals(f.seek(0), 0)
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001916 self.assertEquals(f.read(None), "abc")
1917 f.seek(0)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001918 self.assertEquals(f.read(2), "ab")
1919 self.assertEquals(f.read(1), "c")
1920 self.assertEquals(f.read(1), "")
1921 self.assertEquals(f.read(), "")
1922 self.assertEquals(f.tell(), cookie)
1923 self.assertEquals(f.seek(0), 0)
1924 self.assertEquals(f.seek(0, 2), cookie)
1925 self.assertEquals(f.write("def"), 3)
1926 self.assertEquals(f.seek(cookie), cookie)
1927 self.assertEquals(f.read(), "def")
1928 if enc.startswith("utf"):
1929 self.multi_line_test(f, enc)
1930 f.close()
1931
1932 def multi_line_test(self, f, enc):
1933 f.seek(0)
1934 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001935 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001936 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001937 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001938 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001939 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001940 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001941 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001942 wlines.append((f.tell(), line))
1943 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001944 f.seek(0)
1945 rlines = []
1946 while True:
1947 pos = f.tell()
1948 line = f.readline()
1949 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001950 break
1951 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001952 self.assertEquals(rlines, wlines)
1953
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001954 def test_telling(self):
1955 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001956 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001957 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001958 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001959 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001960 p2 = f.tell()
1961 f.seek(0)
1962 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001963 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001964 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001965 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001966 self.assertEquals(f.tell(), p2)
1967 f.seek(0)
1968 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001969 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001970 self.assertRaises(IOError, f.tell)
1971 self.assertEquals(f.tell(), p2)
1972 f.close()
1973
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001974 def test_seeking(self):
1975 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001976 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001977 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001978 prefix = bytes(u_prefix.encode("utf-8"))
1979 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001980 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001981 suffix = bytes(u_suffix.encode("utf-8"))
1982 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001983 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001984 f.write(line*2)
1985 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001986 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001987 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001988 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001989 self.assertEquals(f.tell(), prefix_size)
1990 self.assertEquals(f.readline(), u_suffix)
1991
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001992 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001993 # Regression test for a specific bug
1994 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001995 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001996 f.write(data)
1997 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001998 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001999 f._CHUNK_SIZE # Just test that it exists
2000 f._CHUNK_SIZE = 2
2001 f.readline()
2002 f.tell()
2003
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002004 def test_seek_and_tell(self):
2005 #Test seek/tell using the StatefulIncrementalDecoder.
2006 # Make test faster by doing smaller seeks
2007 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002008
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002009 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002010 """Tell/seek to various points within a data stream and ensure
2011 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002012 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002013 f.write(data)
2014 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002015 f = self.open(support.TESTFN, encoding='test_decoder')
2016 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002017 decoded = f.read()
2018 f.close()
2019
Neal Norwitze2b07052008-03-18 19:52:05 +00002020 for i in range(min_pos, len(decoded) + 1): # seek positions
2021 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002022 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002023 self.assertEquals(f.read(i), decoded[:i])
2024 cookie = f.tell()
2025 self.assertEquals(f.read(j), decoded[i:i + j])
2026 f.seek(cookie)
2027 self.assertEquals(f.read(), decoded[i:])
2028 f.close()
2029
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002030 # Enable the test decoder.
2031 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002032
2033 # Run the tests.
2034 try:
2035 # Try each test case.
2036 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002037 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002038
2039 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002040 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2041 offset = CHUNK_SIZE - len(input)//2
2042 prefix = b'.'*offset
2043 # Don't bother seeking into the prefix (takes too long).
2044 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002045 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002046
2047 # Ensure our test decoder won't interfere with subsequent tests.
2048 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002049 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002050
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002051 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002052 data = "1234567890"
2053 tests = ("utf-16",
2054 "utf-16-le",
2055 "utf-16-be",
2056 "utf-32",
2057 "utf-32-le",
2058 "utf-32-be")
2059 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002060 buf = self.BytesIO()
2061 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002062 # Check if the BOM is written only once (see issue1753).
2063 f.write(data)
2064 f.write(data)
2065 f.seek(0)
2066 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002067 f.seek(0)
2068 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002069 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
2070
Benjamin Petersona1b49012009-03-31 23:11:32 +00002071 def test_unreadable(self):
2072 class UnReadable(self.BytesIO):
2073 def readable(self):
2074 return False
2075 txt = self.TextIOWrapper(UnReadable())
2076 self.assertRaises(IOError, txt.read)
2077
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002078 def test_read_one_by_one(self):
2079 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002080 reads = ""
2081 while True:
2082 c = txt.read(1)
2083 if not c:
2084 break
2085 reads += c
2086 self.assertEquals(reads, "AA\nBB")
2087
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002088 def test_readlines(self):
2089 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2090 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2091 txt.seek(0)
2092 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2093 txt.seek(0)
2094 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2095
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002096 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002097 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002098 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002099 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002100 reads = ""
2101 while True:
2102 c = txt.read(128)
2103 if not c:
2104 break
2105 reads += c
2106 self.assertEquals(reads, "A"*127+"\nB")
2107
2108 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002109 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002110
2111 # read one char at a time
2112 reads = ""
2113 while True:
2114 c = txt.read(1)
2115 if not c:
2116 break
2117 reads += c
2118 self.assertEquals(reads, self.normalized)
2119
2120 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002121 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002122 txt._CHUNK_SIZE = 4
2123
2124 reads = ""
2125 while True:
2126 c = txt.read(4)
2127 if not c:
2128 break
2129 reads += c
2130 self.assertEquals(reads, self.normalized)
2131
2132 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002133 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002134 txt._CHUNK_SIZE = 4
2135
2136 reads = txt.read(4)
2137 reads += txt.read(4)
2138 reads += txt.readline()
2139 reads += txt.readline()
2140 reads += txt.readline()
2141 self.assertEquals(reads, self.normalized)
2142
2143 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002144 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002145 txt._CHUNK_SIZE = 4
2146
2147 reads = txt.read(4)
2148 reads += txt.read()
2149 self.assertEquals(reads, self.normalized)
2150
2151 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002152 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002153 txt._CHUNK_SIZE = 4
2154
2155 reads = txt.read(4)
2156 pos = txt.tell()
2157 txt.seek(0)
2158 txt.seek(pos)
2159 self.assertEquals(txt.read(4), "BBB\n")
2160
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002161 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002162 buffer = self.BytesIO(self.testdata)
2163 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002164
2165 self.assertEqual(buffer.seekable(), txt.seekable())
2166
Antoine Pitroue4501852009-05-14 18:55:55 +00002167 def test_append_bom(self):
2168 # The BOM is not written again when appending to a non-empty file
2169 filename = support.TESTFN
2170 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2171 with self.open(filename, 'w', encoding=charset) as f:
2172 f.write('aaa')
2173 pos = f.tell()
2174 with self.open(filename, 'rb') as f:
2175 self.assertEquals(f.read(), 'aaa'.encode(charset))
2176
2177 with self.open(filename, 'a', encoding=charset) as f:
2178 f.write('xxx')
2179 with self.open(filename, 'rb') as f:
2180 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2181
2182 def test_seek_bom(self):
2183 # Same test, but when seeking manually
2184 filename = support.TESTFN
2185 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2186 with self.open(filename, 'w', encoding=charset) as f:
2187 f.write('aaa')
2188 pos = f.tell()
2189 with self.open(filename, 'r+', encoding=charset) as f:
2190 f.seek(pos)
2191 f.write('zzz')
2192 f.seek(0)
2193 f.write('bbb')
2194 with self.open(filename, 'rb') as f:
2195 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2196
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002197 def test_errors_property(self):
2198 with self.open(support.TESTFN, "w") as f:
2199 self.assertEqual(f.errors, "strict")
2200 with self.open(support.TESTFN, "w", errors="replace") as f:
2201 self.assertEqual(f.errors, "replace")
2202
Victor Stinner45df8202010-04-28 22:31:17 +00002203 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002204 def test_threads_write(self):
2205 # Issue6750: concurrent writes could duplicate data
2206 event = threading.Event()
2207 with self.open(support.TESTFN, "w", buffering=1) as f:
2208 def run(n):
2209 text = "Thread%03d\n" % n
2210 event.wait()
2211 f.write(text)
2212 threads = [threading.Thread(target=lambda n=x: run(n))
2213 for x in range(20)]
2214 for t in threads:
2215 t.start()
2216 time.sleep(0.02)
2217 event.set()
2218 for t in threads:
2219 t.join()
2220 with self.open(support.TESTFN) as f:
2221 content = f.read()
2222 for n in range(20):
2223 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2224
Antoine Pitrou6be88762010-05-03 16:48:20 +00002225 def test_flush_error_on_close(self):
2226 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2227 def bad_flush():
2228 raise IOError()
2229 txt.flush = bad_flush
2230 self.assertRaises(IOError, txt.close) # exception not swallowed
2231
2232 def test_multi_close(self):
2233 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2234 txt.close()
2235 txt.close()
2236 txt.close()
2237 self.assertRaises(ValueError, txt.flush)
2238
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002239 def test_unseekable(self):
2240 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2241 self.assertRaises(self.UnsupportedOperation, txt.tell)
2242 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2243
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002244class CTextIOWrapperTest(TextIOWrapperTest):
2245
2246 def test_initialization(self):
2247 r = self.BytesIO(b"\xc3\xa9\n\n")
2248 b = self.BufferedReader(r, 1000)
2249 t = self.TextIOWrapper(b)
2250 self.assertRaises(TypeError, t.__init__, b, newline=42)
2251 self.assertRaises(ValueError, t.read)
2252 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2253 self.assertRaises(ValueError, t.read)
2254
2255 def test_garbage_collection(self):
2256 # C TextIOWrapper objects are collected, and collecting them flushes
2257 # all data to disk.
2258 # The Python version has __del__, so it ends in gc.garbage instead.
2259 rawio = io.FileIO(support.TESTFN, "wb")
2260 b = self.BufferedWriter(rawio)
2261 t = self.TextIOWrapper(b, encoding="ascii")
2262 t.write("456def")
2263 t.x = t
2264 wr = weakref.ref(t)
2265 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002266 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002267 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002268 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002269 self.assertEqual(f.read(), b"456def")
2270
2271class PyTextIOWrapperTest(TextIOWrapperTest):
2272 pass
2273
2274
2275class IncrementalNewlineDecoderTest(unittest.TestCase):
2276
2277 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002278 # UTF-8 specific tests for a newline decoder
2279 def _check_decode(b, s, **kwargs):
2280 # We exercise getstate() / setstate() as well as decode()
2281 state = decoder.getstate()
2282 self.assertEquals(decoder.decode(b, **kwargs), s)
2283 decoder.setstate(state)
2284 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002285
Antoine Pitrou180a3362008-12-14 16:36:46 +00002286 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002287
Antoine Pitrou180a3362008-12-14 16:36:46 +00002288 _check_decode(b'\xe8', "")
2289 _check_decode(b'\xa2', "")
2290 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002291
Antoine Pitrou180a3362008-12-14 16:36:46 +00002292 _check_decode(b'\xe8', "")
2293 _check_decode(b'\xa2', "")
2294 _check_decode(b'\x88', "\u8888")
2295
2296 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002297 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2298
Antoine Pitrou180a3362008-12-14 16:36:46 +00002299 decoder.reset()
2300 _check_decode(b'\n', "\n")
2301 _check_decode(b'\r', "")
2302 _check_decode(b'', "\n", final=True)
2303 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002304
Antoine Pitrou180a3362008-12-14 16:36:46 +00002305 _check_decode(b'\r', "")
2306 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002307
Antoine Pitrou180a3362008-12-14 16:36:46 +00002308 _check_decode(b'\r\r\n', "\n\n")
2309 _check_decode(b'\r', "")
2310 _check_decode(b'\r', "\n")
2311 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002312
Antoine Pitrou180a3362008-12-14 16:36:46 +00002313 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2314 _check_decode(b'\xe8\xa2\x88', "\u8888")
2315 _check_decode(b'\n', "\n")
2316 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2317 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002318
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002319 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002320 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002321 if encoding is not None:
2322 encoder = codecs.getincrementalencoder(encoding)()
2323 def _decode_bytewise(s):
2324 # Decode one byte at a time
2325 for b in encoder.encode(s):
2326 result.append(decoder.decode(bytes([b])))
2327 else:
2328 encoder = None
2329 def _decode_bytewise(s):
2330 # Decode one char at a time
2331 for c in s:
2332 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002333 self.assertEquals(decoder.newlines, None)
2334 _decode_bytewise("abc\n\r")
2335 self.assertEquals(decoder.newlines, '\n')
2336 _decode_bytewise("\nabc")
2337 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2338 _decode_bytewise("abc\r")
2339 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2340 _decode_bytewise("abc")
2341 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2342 _decode_bytewise("abc\r")
2343 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2344 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002345 input = "abc"
2346 if encoder is not None:
2347 encoder.reset()
2348 input = encoder.encode(input)
2349 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002350 self.assertEquals(decoder.newlines, None)
2351
2352 def test_newline_decoder(self):
2353 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002354 # None meaning the IncrementalNewlineDecoder takes unicode input
2355 # rather than bytes input
2356 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002357 'utf-16', 'utf-16-le', 'utf-16-be',
2358 'utf-32', 'utf-32-le', 'utf-32-be',
2359 )
2360 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002361 decoder = enc and codecs.getincrementaldecoder(enc)()
2362 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2363 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002364 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002365 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2366 self.check_newline_decoding_utf8(decoder)
2367
Antoine Pitrou66913e22009-03-06 23:40:56 +00002368 def test_newline_bytes(self):
2369 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2370 def _check(dec):
2371 self.assertEquals(dec.newlines, None)
2372 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2373 self.assertEquals(dec.newlines, None)
2374 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2375 self.assertEquals(dec.newlines, None)
2376 dec = self.IncrementalNewlineDecoder(None, translate=False)
2377 _check(dec)
2378 dec = self.IncrementalNewlineDecoder(None, translate=True)
2379 _check(dec)
2380
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002381class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2382 pass
2383
2384class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2385 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002386
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002387
Guido van Rossum01a27522007-03-07 01:00:12 +00002388# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002389
Guido van Rossum5abbf752007-08-27 17:39:33 +00002390class MiscIOTest(unittest.TestCase):
2391
Barry Warsaw40e82462008-11-20 20:14:50 +00002392 def tearDown(self):
2393 support.unlink(support.TESTFN)
2394
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002395 def test___all__(self):
2396 for name in self.io.__all__:
2397 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002398 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002399 if name == "open":
2400 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002401 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002402 self.assertTrue(issubclass(obj, Exception), name)
2403 elif not name.startswith("SEEK_"):
2404 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002405
Barry Warsaw40e82462008-11-20 20:14:50 +00002406 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002407 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002408 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002409 f.close()
2410
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002411 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002412 self.assertEquals(f.name, support.TESTFN)
2413 self.assertEquals(f.buffer.name, support.TESTFN)
2414 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2415 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002416 self.assertEquals(f.buffer.mode, "rb")
2417 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002418 f.close()
2419
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002420 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002421 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002422 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2423 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002424
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002425 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002426 self.assertEquals(g.mode, "wb")
2427 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002428 self.assertEquals(g.name, f.fileno())
2429 self.assertEquals(g.raw.name, f.fileno())
2430 f.close()
2431 g.close()
2432
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002433 def test_io_after_close(self):
2434 for kwargs in [
2435 {"mode": "w"},
2436 {"mode": "wb"},
2437 {"mode": "w", "buffering": 1},
2438 {"mode": "w", "buffering": 2},
2439 {"mode": "wb", "buffering": 0},
2440 {"mode": "r"},
2441 {"mode": "rb"},
2442 {"mode": "r", "buffering": 1},
2443 {"mode": "r", "buffering": 2},
2444 {"mode": "rb", "buffering": 0},
2445 {"mode": "w+"},
2446 {"mode": "w+b"},
2447 {"mode": "w+", "buffering": 1},
2448 {"mode": "w+", "buffering": 2},
2449 {"mode": "w+b", "buffering": 0},
2450 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002451 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002452 f.close()
2453 self.assertRaises(ValueError, f.flush)
2454 self.assertRaises(ValueError, f.fileno)
2455 self.assertRaises(ValueError, f.isatty)
2456 self.assertRaises(ValueError, f.__iter__)
2457 if hasattr(f, "peek"):
2458 self.assertRaises(ValueError, f.peek, 1)
2459 self.assertRaises(ValueError, f.read)
2460 if hasattr(f, "read1"):
2461 self.assertRaises(ValueError, f.read1, 1024)
2462 if hasattr(f, "readinto"):
2463 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2464 self.assertRaises(ValueError, f.readline)
2465 self.assertRaises(ValueError, f.readlines)
2466 self.assertRaises(ValueError, f.seek, 0)
2467 self.assertRaises(ValueError, f.tell)
2468 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002469 self.assertRaises(ValueError, f.write,
2470 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002471 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002472 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002473
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002474 def test_blockingioerror(self):
2475 # Various BlockingIOError issues
2476 self.assertRaises(TypeError, self.BlockingIOError)
2477 self.assertRaises(TypeError, self.BlockingIOError, 1)
2478 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2479 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2480 b = self.BlockingIOError(1, "")
2481 self.assertEqual(b.characters_written, 0)
2482 class C(str):
2483 pass
2484 c = C("")
2485 b = self.BlockingIOError(1, c)
2486 c.b = b
2487 b.c = c
2488 wr = weakref.ref(c)
2489 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002490 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002491 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002492
2493 def test_abcs(self):
2494 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002495 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2496 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2497 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2498 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002499
2500 def _check_abc_inheritance(self, abcmodule):
2501 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002502 self.assertIsInstance(f, abcmodule.IOBase)
2503 self.assertIsInstance(f, abcmodule.RawIOBase)
2504 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2505 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002506 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002507 self.assertIsInstance(f, abcmodule.IOBase)
2508 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2509 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2510 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002511 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002512 self.assertIsInstance(f, abcmodule.IOBase)
2513 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2514 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2515 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002516
2517 def test_abc_inheritance(self):
2518 # Test implementations inherit from their respective ABCs
2519 self._check_abc_inheritance(self)
2520
2521 def test_abc_inheritance_official(self):
2522 # Test implementations inherit from the official ABCs of the
2523 # baseline "io" module.
2524 self._check_abc_inheritance(io)
2525
2526class CMiscIOTest(MiscIOTest):
2527 io = io
2528
2529class PyMiscIOTest(MiscIOTest):
2530 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002531
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002532
2533@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2534class SignalsTest(unittest.TestCase):
2535
2536 def setUp(self):
2537 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2538
2539 def tearDown(self):
2540 signal.signal(signal.SIGALRM, self.oldalrm)
2541
2542 def alarm_interrupt(self, sig, frame):
2543 1/0
2544
2545 @unittest.skipUnless(threading, 'Threading required for this test.')
2546 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2547 """Check that a partial write, when it gets interrupted, properly
2548 invokes the signal handler."""
2549 read_results = []
2550 def _read():
2551 s = os.read(r, 1)
2552 read_results.append(s)
2553 t = threading.Thread(target=_read)
2554 t.daemon = True
2555 r, w = os.pipe()
2556 try:
2557 wio = self.io.open(w, **fdopen_kwargs)
2558 t.start()
2559 signal.alarm(1)
2560 # Fill the pipe enough that the write will be blocking.
2561 # It will be interrupted by the timer armed above. Since the
2562 # other thread has read one byte, the low-level write will
2563 # return with a successful (partial) result rather than an EINTR.
2564 # The buffered IO layer must check for pending signal
2565 # handlers, which in this case will invoke alarm_interrupt().
2566 self.assertRaises(ZeroDivisionError,
2567 wio.write, item * (1024 * 1024))
2568 t.join()
2569 # We got one byte, get another one and check that it isn't a
2570 # repeat of the first one.
2571 read_results.append(os.read(r, 1))
2572 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2573 finally:
2574 os.close(w)
2575 os.close(r)
2576 # This is deliberate. If we didn't close the file descriptor
2577 # before closing wio, wio would try to flush its internal
2578 # buffer, and block again.
2579 try:
2580 wio.close()
2581 except IOError as e:
2582 if e.errno != errno.EBADF:
2583 raise
2584
2585 def test_interrupted_write_unbuffered(self):
2586 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2587
2588 def test_interrupted_write_buffered(self):
2589 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2590
2591 def test_interrupted_write_text(self):
2592 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2593
2594class CSignalsTest(SignalsTest):
2595 io = io
2596
2597class PySignalsTest(SignalsTest):
2598 io = pyio
2599
2600
Guido van Rossum28524c72007-02-27 05:47:44 +00002601def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002602 tests = (CIOTest, PyIOTest,
2603 CBufferedReaderTest, PyBufferedReaderTest,
2604 CBufferedWriterTest, PyBufferedWriterTest,
2605 CBufferedRWPairTest, PyBufferedRWPairTest,
2606 CBufferedRandomTest, PyBufferedRandomTest,
2607 StatefulIncrementalDecoderTest,
2608 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2609 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002610 CMiscIOTest, PyMiscIOTest,
2611 CSignalsTest, PySignalsTest,
2612 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002613
2614 # Put the namespaces of the IO module we are testing and some useful mock
2615 # classes in the __dict__ of each test.
2616 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00002617 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002618 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2619 c_io_ns = {name : getattr(io, name) for name in all_members}
2620 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2621 globs = globals()
2622 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2623 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2624 # Avoid turning open into a bound method.
2625 py_io_ns["open"] = pyio.OpenWrapper
2626 for test in tests:
2627 if test.__name__.startswith("C"):
2628 for name, obj in c_io_ns.items():
2629 setattr(test, name, obj)
2630 elif test.__name__.startswith("Py"):
2631 for name, obj in py_io_ns.items():
2632 setattr(test, name, obj)
2633
2634 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002635
2636if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002637 test_main()