blob: a4124697a849efb0f6e0407e6010d3fab5f23a69 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000038
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000039import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000042try:
43 import threading
44except ImportError:
45 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010046try:
47 import fcntl
48except ImportError:
49 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000050
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000051def _default_chunk_size():
52 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000053 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000054 return f._CHUNK_SIZE
55
56
Antoine Pitrou328ec742010-09-14 18:37:24 +000057class MockRawIOWithoutRead:
58 """A RawIO implementation without read(), so as to exercise the default
59 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000060
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000061 def __init__(self, read_stack=()):
62 self._read_stack = list(read_stack)
63 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000065 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000066
Guido van Rossum01a27522007-03-07 01:00:12 +000067 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000068 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000069 return len(b)
70
71 def writable(self):
72 return True
73
Guido van Rossum68bbcd22007-02-27 17:19:33 +000074 def fileno(self):
75 return 42
76
77 def readable(self):
78 return True
79
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000081 return True
82
Guido van Rossum01a27522007-03-07 01:00:12 +000083 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000085
86 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000087 return 0 # same comment as above
88
89 def readinto(self, buf):
90 self._reads += 1
91 max_len = len(buf)
92 try:
93 data = self._read_stack[0]
94 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000095 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000096 return 0
97 if data is None:
98 del self._read_stack[0]
99 return None
100 n = len(data)
101 if len(data) <= max_len:
102 del self._read_stack[0]
103 buf[:n] = data
104 return n
105 else:
106 buf[:] = data[:max_len]
107 self._read_stack[0] = data[max_len:]
108 return max_len
109
110 def truncate(self, pos=None):
111 return pos
112
Antoine Pitrou328ec742010-09-14 18:37:24 +0000113class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
114 pass
115
116class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
117 pass
118
119
120class MockRawIO(MockRawIOWithoutRead):
121
122 def read(self, n=None):
123 self._reads += 1
124 try:
125 return self._read_stack.pop(0)
126 except:
127 self._extraneous_reads += 1
128 return b""
129
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000130class CMockRawIO(MockRawIO, io.RawIOBase):
131 pass
132
133class PyMockRawIO(MockRawIO, pyio.RawIOBase):
134 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000135
Guido van Rossuma9e20242007-03-08 00:43:48 +0000136
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000137class MisbehavedRawIO(MockRawIO):
138 def write(self, b):
139 return super().write(b) * 2
140
141 def read(self, n=None):
142 return super().read(n) * 2
143
144 def seek(self, pos, whence):
145 return -123
146
147 def tell(self):
148 return -456
149
150 def readinto(self, buf):
151 super().readinto(buf)
152 return len(buf) * 5
153
154class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
155 pass
156
157class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
158 pass
159
160
161class CloseFailureIO(MockRawIO):
162 closed = 0
163
164 def close(self):
165 if not self.closed:
166 self.closed = 1
167 raise IOError
168
169class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
170 pass
171
172class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
173 pass
174
175
176class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000177
178 def __init__(self, data):
179 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000181
182 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000183 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000184 self.read_history.append(None if res is None else len(res))
185 return res
186
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000187 def readinto(self, b):
188 res = super().readinto(b)
189 self.read_history.append(res)
190 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000192class CMockFileIO(MockFileIO, io.BytesIO):
193 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000194
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000195class PyMockFileIO(MockFileIO, pyio.BytesIO):
196 pass
197
198
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000199class MockUnseekableIO:
200 def seekable(self):
201 return False
202
203 def seek(self, *args):
204 raise self.UnsupportedOperation("not seekable")
205
206 def tell(self, *args):
207 raise self.UnsupportedOperation("not seekable")
208
209class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
210 UnsupportedOperation = io.UnsupportedOperation
211
212class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
213 UnsupportedOperation = pyio.UnsupportedOperation
214
215
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216class MockNonBlockWriterIO:
217
218 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000219 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000220 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000222 def pop_written(self):
223 s = b"".join(self._write_stack)
224 self._write_stack[:] = []
225 return s
226
227 def block_on(self, char):
228 """Block when a given char is encountered."""
229 self._blocker_char = char
230
231 def readable(self):
232 return True
233
234 def seekable(self):
235 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000236
Guido van Rossum01a27522007-03-07 01:00:12 +0000237 def writable(self):
238 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000239
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000240 def write(self, b):
241 b = bytes(b)
242 n = -1
243 if self._blocker_char:
244 try:
245 n = b.index(self._blocker_char)
246 except ValueError:
247 pass
248 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100249 if n > 0:
250 # write data up to the first blocker
251 self._write_stack.append(b[:n])
252 return n
253 else:
254 # cancel blocker and indicate would block
255 self._blocker_char = None
256 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000257 self._write_stack.append(b)
258 return len(b)
259
260class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
261 BlockingIOError = io.BlockingIOError
262
263class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
264 BlockingIOError = pyio.BlockingIOError
265
Guido van Rossuma9e20242007-03-08 00:43:48 +0000266
Guido van Rossum28524c72007-02-27 05:47:44 +0000267class IOTest(unittest.TestCase):
268
Neal Norwitze7789b12008-03-24 06:18:09 +0000269 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000270 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000271
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000272 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000273 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000274
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000276 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000277 f.truncate(0)
278 self.assertEqual(f.tell(), 5)
279 f.seek(0)
280
281 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000282 self.assertEqual(f.seek(0), 0)
283 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000284 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000285 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000286 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000287 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000288 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000289 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000290 self.assertEqual(f.seek(-1, 2), 13)
291 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000292
Guido van Rossum87429772007-04-10 21:06:59 +0000293 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000294 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000295 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000296
Guido van Rossum9b76da62007-04-11 01:09:03 +0000297 def read_ops(self, f, buffered=False):
298 data = f.read(5)
299 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000300 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000301 self.assertEqual(f.readinto(data), 5)
302 self.assertEqual(data, b" worl")
303 self.assertEqual(f.readinto(data), 2)
304 self.assertEqual(len(data), 5)
305 self.assertEqual(data[:2], b"d\n")
306 self.assertEqual(f.seek(0), 0)
307 self.assertEqual(f.read(20), b"hello world\n")
308 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000309 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000310 self.assertEqual(f.seek(-6, 2), 6)
311 self.assertEqual(f.read(5), b"world")
312 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000313 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000314 self.assertEqual(f.seek(-6, 1), 5)
315 self.assertEqual(f.read(5), b" worl")
316 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000317 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000318 if buffered:
319 f.seek(0)
320 self.assertEqual(f.read(), b"hello world\n")
321 f.seek(6)
322 self.assertEqual(f.read(), b"world\n")
323 self.assertEqual(f.read(), b"")
324
Guido van Rossum34d69e52007-04-10 20:08:41 +0000325 LARGE = 2**31
326
Guido van Rossum53807da2007-04-10 19:01:47 +0000327 def large_file_ops(self, f):
328 assert f.readable()
329 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000330 self.assertEqual(f.seek(self.LARGE), self.LARGE)
331 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000332 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000333 self.assertEqual(f.tell(), self.LARGE + 3)
334 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000336 self.assertEqual(f.tell(), self.LARGE + 2)
337 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000338 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000339 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000340 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
341 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000342 self.assertEqual(f.read(2), b"x")
343
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000344 def test_invalid_operations(self):
345 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000346 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000347 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000348 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000349 self.assertRaises(exc, fp.read)
350 self.assertRaises(exc, fp.readline)
351 with self.open(support.TESTFN, "wb", buffering=0) as fp:
352 self.assertRaises(exc, fp.read)
353 self.assertRaises(exc, fp.readline)
354 with self.open(support.TESTFN, "rb", buffering=0) as fp:
355 self.assertRaises(exc, fp.write, b"blah")
356 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000357 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000358 self.assertRaises(exc, fp.write, b"blah")
359 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000360 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000361 self.assertRaises(exc, fp.write, "blah")
362 self.assertRaises(exc, fp.writelines, ["blah\n"])
363 # Non-zero seeking from current or end pos
364 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
365 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000366
Antoine Pitrou13348842012-01-29 18:36:34 +0100367 def test_open_handles_NUL_chars(self):
368 fn_with_NUL = 'foo\0bar'
369 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
370 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
371
Guido van Rossum28524c72007-02-27 05:47:44 +0000372 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000373 with self.open(support.TESTFN, "wb", buffering=0) as f:
374 self.assertEqual(f.readable(), False)
375 self.assertEqual(f.writable(), True)
376 self.assertEqual(f.seekable(), True)
377 self.write_ops(f)
378 with self.open(support.TESTFN, "rb", buffering=0) as f:
379 self.assertEqual(f.readable(), True)
380 self.assertEqual(f.writable(), False)
381 self.assertEqual(f.seekable(), True)
382 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000383
Guido van Rossum87429772007-04-10 21:06:59 +0000384 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000385 with self.open(support.TESTFN, "wb") as f:
386 self.assertEqual(f.readable(), False)
387 self.assertEqual(f.writable(), True)
388 self.assertEqual(f.seekable(), True)
389 self.write_ops(f)
390 with self.open(support.TESTFN, "rb") as f:
391 self.assertEqual(f.readable(), True)
392 self.assertEqual(f.writable(), False)
393 self.assertEqual(f.seekable(), True)
394 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000395
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000396 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000397 with self.open(support.TESTFN, "wb") as f:
398 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
399 with self.open(support.TESTFN, "rb") as f:
400 self.assertEqual(f.readline(), b"abc\n")
401 self.assertEqual(f.readline(10), b"def\n")
402 self.assertEqual(f.readline(2), b"xy")
403 self.assertEqual(f.readline(4), b"zzy\n")
404 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000405 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000406 self.assertRaises(TypeError, f.readline, 5.3)
407 with self.open(support.TESTFN, "r") as f:
408 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000409
Guido van Rossum28524c72007-02-27 05:47:44 +0000410 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000411 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000412 self.write_ops(f)
413 data = f.getvalue()
414 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000415 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000416 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000417
Guido van Rossum53807da2007-04-10 19:01:47 +0000418 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000419 # On Windows and Mac OSX this test comsumes large resources; It takes
420 # a long time to build the >2GB file and takes >2GB of disk space
421 # therefore the resource must be enabled to run this test.
422 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000423 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000424 print("\nTesting large file ops skipped on %s." % sys.platform,
425 file=sys.stderr)
426 print("It requires %d bytes and a long time." % self.LARGE,
427 file=sys.stderr)
428 print("Use 'regrtest.py -u largefile test_io' to run it.",
429 file=sys.stderr)
430 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000431 with self.open(support.TESTFN, "w+b", 0) as f:
432 self.large_file_ops(f)
433 with self.open(support.TESTFN, "w+b") as f:
434 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000435
436 def test_with_open(self):
437 for bufsize in (0, 1, 100):
438 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000439 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000440 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000441 self.assertEqual(f.closed, True)
442 f = None
443 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000444 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000445 1/0
446 except ZeroDivisionError:
447 self.assertEqual(f.closed, True)
448 else:
449 self.fail("1/0 didn't raise an exception")
450
Antoine Pitrou08838b62009-01-21 00:55:13 +0000451 # issue 5008
452 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000454 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000455 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000456 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000457 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000458 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000459 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000460 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000461
Guido van Rossum87429772007-04-10 21:06:59 +0000462 def test_destructor(self):
463 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000464 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000465 def __del__(self):
466 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000467 try:
468 f = super().__del__
469 except AttributeError:
470 pass
471 else:
472 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000473 def close(self):
474 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000475 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000476 def flush(self):
477 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000478 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000479 with support.check_warnings(('', ResourceWarning)):
480 f = MyFileIO(support.TESTFN, "wb")
481 f.write(b"xxx")
482 del f
483 support.gc_collect()
484 self.assertEqual(record, [1, 2, 3])
485 with self.open(support.TESTFN, "rb") as f:
486 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000487
488 def _check_base_destructor(self, base):
489 record = []
490 class MyIO(base):
491 def __init__(self):
492 # This exercises the availability of attributes on object
493 # destruction.
494 # (in the C version, close() is called by the tp_dealloc
495 # function, not by __del__)
496 self.on_del = 1
497 self.on_close = 2
498 self.on_flush = 3
499 def __del__(self):
500 record.append(self.on_del)
501 try:
502 f = super().__del__
503 except AttributeError:
504 pass
505 else:
506 f()
507 def close(self):
508 record.append(self.on_close)
509 super().close()
510 def flush(self):
511 record.append(self.on_flush)
512 super().flush()
513 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000514 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000515 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000516 self.assertEqual(record, [1, 2, 3])
517
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000518 def test_IOBase_destructor(self):
519 self._check_base_destructor(self.IOBase)
520
521 def test_RawIOBase_destructor(self):
522 self._check_base_destructor(self.RawIOBase)
523
524 def test_BufferedIOBase_destructor(self):
525 self._check_base_destructor(self.BufferedIOBase)
526
527 def test_TextIOBase_destructor(self):
528 self._check_base_destructor(self.TextIOBase)
529
Guido van Rossum87429772007-04-10 21:06:59 +0000530 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000531 with self.open(support.TESTFN, "wb") as f:
532 f.write(b"xxx")
533 with self.open(support.TESTFN, "rb") as f:
534 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000535
Guido van Rossumd4103952007-04-12 05:44:49 +0000536 def test_array_writes(self):
537 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000538 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000539 with self.open(support.TESTFN, "wb", 0) as f:
540 self.assertEqual(f.write(a), n)
541 with self.open(support.TESTFN, "wb") as f:
542 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000543
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000544 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000545 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000546 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000547
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000548 def test_read_closed(self):
549 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000550 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 with self.open(support.TESTFN, "r") as f:
552 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000553 self.assertEqual(file.read(), "egg\n")
554 file.seek(0)
555 file.close()
556 self.assertRaises(ValueError, file.read)
557
558 def test_no_closefd_with_filename(self):
559 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000560 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000561
562 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000563 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000564 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000565 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000566 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000567 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000568 self.assertEqual(file.buffer.raw.closefd, False)
569
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000570 def test_garbage_collection(self):
571 # FileIO objects are collected, and collecting them flushes
572 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000573 with support.check_warnings(('', ResourceWarning)):
574 f = self.FileIO(support.TESTFN, "wb")
575 f.write(b"abcxxx")
576 f.f = f
577 wr = weakref.ref(f)
578 del f
579 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000580 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000581 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000582 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000583
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000584 def test_unbounded_file(self):
585 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
586 zero = "/dev/zero"
587 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000588 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000589 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000590 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000591 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000592 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000593 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000594 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000595 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000596 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000597 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000598 self.assertRaises(OverflowError, f.read)
599
Antoine Pitrou6be88762010-05-03 16:48:20 +0000600 def test_flush_error_on_close(self):
601 f = self.open(support.TESTFN, "wb", buffering=0)
602 def bad_flush():
603 raise IOError()
604 f.flush = bad_flush
605 self.assertRaises(IOError, f.close) # exception not swallowed
606
607 def test_multi_close(self):
608 f = self.open(support.TESTFN, "wb", buffering=0)
609 f.close()
610 f.close()
611 f.close()
612 self.assertRaises(ValueError, f.flush)
613
Antoine Pitrou328ec742010-09-14 18:37:24 +0000614 def test_RawIOBase_read(self):
615 # Exercise the default RawIOBase.read() implementation (which calls
616 # readinto() internally).
617 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
618 self.assertEqual(rawio.read(2), b"ab")
619 self.assertEqual(rawio.read(2), b"c")
620 self.assertEqual(rawio.read(2), b"d")
621 self.assertEqual(rawio.read(2), None)
622 self.assertEqual(rawio.read(2), b"ef")
623 self.assertEqual(rawio.read(2), b"g")
624 self.assertEqual(rawio.read(2), None)
625 self.assertEqual(rawio.read(2), b"")
626
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400627 def test_types_have_dict(self):
628 test = (
629 self.IOBase(),
630 self.RawIOBase(),
631 self.TextIOBase(),
632 self.StringIO(),
633 self.BytesIO()
634 )
635 for obj in test:
636 self.assertTrue(hasattr(obj, "__dict__"))
637
Ross Lagerwall59142db2011-10-31 20:34:46 +0200638 def test_opener(self):
639 with self.open(support.TESTFN, "w") as f:
640 f.write("egg\n")
641 fd = os.open(support.TESTFN, os.O_RDONLY)
642 def opener(path, flags):
643 return fd
644 with self.open("non-existent", "r", opener=opener) as f:
645 self.assertEqual(f.read(), "egg\n")
646
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200647 def test_fileio_closefd(self):
648 # Issue #4841
649 with self.open(__file__, 'rb') as f1, \
650 self.open(__file__, 'rb') as f2:
651 fileio = self.FileIO(f1.fileno(), closefd=False)
652 # .__init__() must not close f1
653 fileio.__init__(f2.fileno(), closefd=False)
654 f1.readline()
655 # .close() must not close f2
656 fileio.close()
657 f2.readline()
658
659
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000660class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200661
662 def test_IOBase_finalize(self):
663 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
664 # class which inherits IOBase and an object of this class are caught
665 # in a reference cycle and close() is already in the method cache.
666 class MyIO(self.IOBase):
667 def close(self):
668 pass
669
670 # create an instance to populate the method cache
671 MyIO()
672 obj = MyIO()
673 obj.obj = obj
674 wr = weakref.ref(obj)
675 del MyIO
676 del obj
677 support.gc_collect()
678 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000679
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000680class PyIOTest(IOTest):
681 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000682
Guido van Rossuma9e20242007-03-08 00:43:48 +0000683
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000684class CommonBufferedTests:
685 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
686
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000687 def test_detach(self):
688 raw = self.MockRawIO()
689 buf = self.tp(raw)
690 self.assertIs(buf.detach(), raw)
691 self.assertRaises(ValueError, buf.detach)
692
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000693 def test_fileno(self):
694 rawio = self.MockRawIO()
695 bufio = self.tp(rawio)
696
Ezio Melottib3aedd42010-11-20 19:04:17 +0000697 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000698
699 def test_no_fileno(self):
700 # XXX will we always have fileno() function? If so, kill
701 # this test. Else, write it.
702 pass
703
704 def test_invalid_args(self):
705 rawio = self.MockRawIO()
706 bufio = self.tp(rawio)
707 # Invalid whence
708 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200709 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000710
711 def test_override_destructor(self):
712 tp = self.tp
713 record = []
714 class MyBufferedIO(tp):
715 def __del__(self):
716 record.append(1)
717 try:
718 f = super().__del__
719 except AttributeError:
720 pass
721 else:
722 f()
723 def close(self):
724 record.append(2)
725 super().close()
726 def flush(self):
727 record.append(3)
728 super().flush()
729 rawio = self.MockRawIO()
730 bufio = MyBufferedIO(rawio)
731 writable = bufio.writable()
732 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000733 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000734 if writable:
735 self.assertEqual(record, [1, 2, 3])
736 else:
737 self.assertEqual(record, [1, 2])
738
739 def test_context_manager(self):
740 # Test usability as a context manager
741 rawio = self.MockRawIO()
742 bufio = self.tp(rawio)
743 def _with():
744 with bufio:
745 pass
746 _with()
747 # bufio should now be closed, and using it a second time should raise
748 # a ValueError.
749 self.assertRaises(ValueError, _with)
750
751 def test_error_through_destructor(self):
752 # Test that the exception state is not modified by a destructor,
753 # even if close() fails.
754 rawio = self.CloseFailureIO()
755 def f():
756 self.tp(rawio).xyzzy
757 with support.captured_output("stderr") as s:
758 self.assertRaises(AttributeError, f)
759 s = s.getvalue().strip()
760 if s:
761 # The destructor *may* have printed an unraisable error, check it
762 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000763 self.assertTrue(s.startswith("Exception IOError: "), s)
764 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000765
Antoine Pitrou716c4442009-05-23 19:04:03 +0000766 def test_repr(self):
767 raw = self.MockRawIO()
768 b = self.tp(raw)
769 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
770 self.assertEqual(repr(b), "<%s>" % clsname)
771 raw.name = "dummy"
772 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
773 raw.name = b"dummy"
774 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
775
Antoine Pitrou6be88762010-05-03 16:48:20 +0000776 def test_flush_error_on_close(self):
777 raw = self.MockRawIO()
778 def bad_flush():
779 raise IOError()
780 raw.flush = bad_flush
781 b = self.tp(raw)
782 self.assertRaises(IOError, b.close) # exception not swallowed
783
784 def test_multi_close(self):
785 raw = self.MockRawIO()
786 b = self.tp(raw)
787 b.close()
788 b.close()
789 b.close()
790 self.assertRaises(ValueError, b.flush)
791
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000792 def test_unseekable(self):
793 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
794 self.assertRaises(self.UnsupportedOperation, bufio.tell)
795 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
796
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000797 def test_readonly_attributes(self):
798 raw = self.MockRawIO()
799 buf = self.tp(raw)
800 x = self.MockRawIO()
801 with self.assertRaises(AttributeError):
802 buf.raw = x
803
Guido van Rossum78892e42007-04-06 17:31:18 +0000804
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200805class SizeofTest:
806
807 @support.cpython_only
808 def test_sizeof(self):
809 bufsize1 = 4096
810 bufsize2 = 8192
811 rawio = self.MockRawIO()
812 bufio = self.tp(rawio, buffer_size=bufsize1)
813 size = sys.getsizeof(bufio) - bufsize1
814 rawio = self.MockRawIO()
815 bufio = self.tp(rawio, buffer_size=bufsize2)
816 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
817
Jesus Ceadc469452012-10-04 12:37:56 +0200818 @support.cpython_only
819 def test_buffer_freeing(self) :
820 bufsize = 4096
821 rawio = self.MockRawIO()
822 bufio = self.tp(rawio, buffer_size=bufsize)
823 size = sys.getsizeof(bufio) - bufsize
824 bufio.close()
825 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200826
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000827class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
828 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000829
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000830 def test_constructor(self):
831 rawio = self.MockRawIO([b"abc"])
832 bufio = self.tp(rawio)
833 bufio.__init__(rawio)
834 bufio.__init__(rawio, buffer_size=1024)
835 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000836 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000837 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
838 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
839 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
840 rawio = self.MockRawIO([b"abc"])
841 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000842 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000843
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000844 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000845 for arg in (None, 7):
846 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
847 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000848 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000849 # Invalid args
850 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000851
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000852 def test_read1(self):
853 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
854 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000855 self.assertEqual(b"a", bufio.read(1))
856 self.assertEqual(b"b", bufio.read1(1))
857 self.assertEqual(rawio._reads, 1)
858 self.assertEqual(b"c", bufio.read1(100))
859 self.assertEqual(rawio._reads, 1)
860 self.assertEqual(b"d", bufio.read1(100))
861 self.assertEqual(rawio._reads, 2)
862 self.assertEqual(b"efg", bufio.read1(100))
863 self.assertEqual(rawio._reads, 3)
864 self.assertEqual(b"", bufio.read1(100))
865 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000866 # Invalid args
867 self.assertRaises(ValueError, bufio.read1, -1)
868
869 def test_readinto(self):
870 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
871 bufio = self.tp(rawio)
872 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000873 self.assertEqual(bufio.readinto(b), 2)
874 self.assertEqual(b, b"ab")
875 self.assertEqual(bufio.readinto(b), 2)
876 self.assertEqual(b, b"cd")
877 self.assertEqual(bufio.readinto(b), 2)
878 self.assertEqual(b, b"ef")
879 self.assertEqual(bufio.readinto(b), 1)
880 self.assertEqual(b, b"gf")
881 self.assertEqual(bufio.readinto(b), 0)
882 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200883 rawio = self.MockRawIO((b"abc", None))
884 bufio = self.tp(rawio)
885 self.assertEqual(bufio.readinto(b), 2)
886 self.assertEqual(b, b"ab")
887 self.assertEqual(bufio.readinto(b), 1)
888 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000889
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000890 def test_readlines(self):
891 def bufio():
892 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
893 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000894 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
895 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
896 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000897
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000898 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000899 data = b"abcdefghi"
900 dlen = len(data)
901
902 tests = [
903 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
904 [ 100, [ 3, 3, 3], [ dlen ] ],
905 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
906 ]
907
908 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000909 rawio = self.MockFileIO(data)
910 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000911 pos = 0
912 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000913 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000914 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000915 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000916 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000917
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000918 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000919 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000920 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
921 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000922 self.assertEqual(b"abcd", bufio.read(6))
923 self.assertEqual(b"e", bufio.read(1))
924 self.assertEqual(b"fg", bufio.read())
925 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200926 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000927 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000928
Victor Stinnera80987f2011-05-25 22:47:16 +0200929 rawio = self.MockRawIO((b"a", None, None))
930 self.assertEqual(b"a", rawio.readall())
931 self.assertIsNone(rawio.readall())
932
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000933 def test_read_past_eof(self):
934 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
935 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000936
Ezio Melottib3aedd42010-11-20 19:04:17 +0000937 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000938
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000939 def test_read_all(self):
940 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
941 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000942
Ezio Melottib3aedd42010-11-20 19:04:17 +0000943 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000944
Victor Stinner45df8202010-04-28 22:31:17 +0000945 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000946 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000947 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000948 try:
949 # Write out many bytes with exactly the same number of 0's,
950 # 1's... 255's. This will help us check that concurrent reading
951 # doesn't duplicate or forget contents.
952 N = 1000
953 l = list(range(256)) * N
954 random.shuffle(l)
955 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000956 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000957 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000958 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000959 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000960 errors = []
961 results = []
962 def f():
963 try:
964 # Intra-buffer read then buffer-flushing read
965 for n in cycle([1, 19]):
966 s = bufio.read(n)
967 if not s:
968 break
969 # list.append() is atomic
970 results.append(s)
971 except Exception as e:
972 errors.append(e)
973 raise
974 threads = [threading.Thread(target=f) for x in range(20)]
975 for t in threads:
976 t.start()
977 time.sleep(0.02) # yield
978 for t in threads:
979 t.join()
980 self.assertFalse(errors,
981 "the following exceptions were caught: %r" % errors)
982 s = b''.join(results)
983 for i in range(256):
984 c = bytes(bytearray([i]))
985 self.assertEqual(s.count(c), N)
986 finally:
987 support.unlink(support.TESTFN)
988
Antoine Pitrou1e44fec2011-10-04 12:26:20 +0200989 def test_unseekable(self):
990 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
991 self.assertRaises(self.UnsupportedOperation, bufio.tell)
992 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
993 bufio.read(1)
994 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
995 self.assertRaises(self.UnsupportedOperation, bufio.tell)
996
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000997 def test_misbehaved_io(self):
998 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
999 bufio = self.tp(rawio)
1000 self.assertRaises(IOError, bufio.seek, 0)
1001 self.assertRaises(IOError, bufio.tell)
1002
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001003 def test_no_extraneous_read(self):
1004 # Issue #9550; when the raw IO object has satisfied the read request,
1005 # we should not issue any additional reads, otherwise it may block
1006 # (e.g. socket).
1007 bufsize = 16
1008 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1009 rawio = self.MockRawIO([b"x" * n])
1010 bufio = self.tp(rawio, bufsize)
1011 self.assertEqual(bufio.read(n), b"x" * n)
1012 # Simple case: one raw read is enough to satisfy the request.
1013 self.assertEqual(rawio._extraneous_reads, 0,
1014 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1015 # A more complex case where two raw reads are needed to satisfy
1016 # the request.
1017 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1018 bufio = self.tp(rawio, bufsize)
1019 self.assertEqual(bufio.read(n), b"x" * n)
1020 self.assertEqual(rawio._extraneous_reads, 0,
1021 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1022
1023
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001024class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001025 tp = io.BufferedReader
1026
1027 def test_constructor(self):
1028 BufferedReaderTest.test_constructor(self)
1029 # The allocation can succeed on 32-bit builds, e.g. with more
1030 # than 2GB RAM and a 64-bit kernel.
1031 if sys.maxsize > 0x7FFFFFFF:
1032 rawio = self.MockRawIO()
1033 bufio = self.tp(rawio)
1034 self.assertRaises((OverflowError, MemoryError, ValueError),
1035 bufio.__init__, rawio, sys.maxsize)
1036
1037 def test_initialization(self):
1038 rawio = self.MockRawIO([b"abc"])
1039 bufio = self.tp(rawio)
1040 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1041 self.assertRaises(ValueError, bufio.read)
1042 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1043 self.assertRaises(ValueError, bufio.read)
1044 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1045 self.assertRaises(ValueError, bufio.read)
1046
1047 def test_misbehaved_io_read(self):
1048 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1049 bufio = self.tp(rawio)
1050 # _pyio.BufferedReader seems to implement reading different, so that
1051 # checking this is not so easy.
1052 self.assertRaises(IOError, bufio.read, 10)
1053
1054 def test_garbage_collection(self):
1055 # C BufferedReader objects are collected.
1056 # The Python version has __del__, so it ends into gc.garbage instead
1057 rawio = self.FileIO(support.TESTFN, "w+b")
1058 f = self.tp(rawio)
1059 f.f = f
1060 wr = weakref.ref(f)
1061 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001062 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001063 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001064
1065class PyBufferedReaderTest(BufferedReaderTest):
1066 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001067
Guido van Rossuma9e20242007-03-08 00:43:48 +00001068
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001069class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1070 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001071
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001072 def test_constructor(self):
1073 rawio = self.MockRawIO()
1074 bufio = self.tp(rawio)
1075 bufio.__init__(rawio)
1076 bufio.__init__(rawio, buffer_size=1024)
1077 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001078 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001079 bufio.flush()
1080 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1081 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1082 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1083 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001084 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001085 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001086 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001087
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001088 def test_detach_flush(self):
1089 raw = self.MockRawIO()
1090 buf = self.tp(raw)
1091 buf.write(b"howdy!")
1092 self.assertFalse(raw._write_stack)
1093 buf.detach()
1094 self.assertEqual(raw._write_stack, [b"howdy!"])
1095
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001096 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001097 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001098 writer = self.MockRawIO()
1099 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001100 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001101 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001102
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001103 def test_write_overflow(self):
1104 writer = self.MockRawIO()
1105 bufio = self.tp(writer, 8)
1106 contents = b"abcdefghijklmnop"
1107 for n in range(0, len(contents), 3):
1108 bufio.write(contents[n:n+3])
1109 flushed = b"".join(writer._write_stack)
1110 # At least (total - 8) bytes were implicitly flushed, perhaps more
1111 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001112 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001114 def check_writes(self, intermediate_func):
1115 # Lots of writes, test the flushed output is as expected.
1116 contents = bytes(range(256)) * 1000
1117 n = 0
1118 writer = self.MockRawIO()
1119 bufio = self.tp(writer, 13)
1120 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1121 def gen_sizes():
1122 for size in count(1):
1123 for i in range(15):
1124 yield size
1125 sizes = gen_sizes()
1126 while n < len(contents):
1127 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001128 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001129 intermediate_func(bufio)
1130 n += size
1131 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001132 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001133
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001134 def test_writes(self):
1135 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001136
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001137 def test_writes_and_flushes(self):
1138 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001139
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001140 def test_writes_and_seeks(self):
1141 def _seekabs(bufio):
1142 pos = bufio.tell()
1143 bufio.seek(pos + 1, 0)
1144 bufio.seek(pos - 1, 0)
1145 bufio.seek(pos, 0)
1146 self.check_writes(_seekabs)
1147 def _seekrel(bufio):
1148 pos = bufio.seek(0, 1)
1149 bufio.seek(+1, 1)
1150 bufio.seek(-1, 1)
1151 bufio.seek(pos, 0)
1152 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001153
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001154 def test_writes_and_truncates(self):
1155 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001156
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001157 def test_write_non_blocking(self):
1158 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001159 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001160
Ezio Melottib3aedd42010-11-20 19:04:17 +00001161 self.assertEqual(bufio.write(b"abcd"), 4)
1162 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001163 # 1 byte will be written, the rest will be buffered
1164 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001165 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001167 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1168 raw.block_on(b"0")
1169 try:
1170 bufio.write(b"opqrwxyz0123456789")
1171 except self.BlockingIOError as e:
1172 written = e.characters_written
1173 else:
1174 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001175 self.assertEqual(written, 16)
1176 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001177 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001178
Ezio Melottib3aedd42010-11-20 19:04:17 +00001179 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001180 s = raw.pop_written()
1181 # Previously buffered bytes were flushed
1182 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001183
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001184 def test_write_and_rewind(self):
1185 raw = io.BytesIO()
1186 bufio = self.tp(raw, 4)
1187 self.assertEqual(bufio.write(b"abcdef"), 6)
1188 self.assertEqual(bufio.tell(), 6)
1189 bufio.seek(0, 0)
1190 self.assertEqual(bufio.write(b"XY"), 2)
1191 bufio.seek(6, 0)
1192 self.assertEqual(raw.getvalue(), b"XYcdef")
1193 self.assertEqual(bufio.write(b"123456"), 6)
1194 bufio.flush()
1195 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001196
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001197 def test_flush(self):
1198 writer = self.MockRawIO()
1199 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001200 bufio.write(b"abc")
1201 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001202 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001203
Antoine Pitrou131a4892012-10-16 22:57:11 +02001204 def test_writelines(self):
1205 l = [b'ab', b'cd', b'ef']
1206 writer = self.MockRawIO()
1207 bufio = self.tp(writer, 8)
1208 bufio.writelines(l)
1209 bufio.flush()
1210 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1211
1212 def test_writelines_userlist(self):
1213 l = UserList([b'ab', b'cd', b'ef'])
1214 writer = self.MockRawIO()
1215 bufio = self.tp(writer, 8)
1216 bufio.writelines(l)
1217 bufio.flush()
1218 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1219
1220 def test_writelines_error(self):
1221 writer = self.MockRawIO()
1222 bufio = self.tp(writer, 8)
1223 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1224 self.assertRaises(TypeError, bufio.writelines, None)
1225 self.assertRaises(TypeError, bufio.writelines, 'abc')
1226
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001227 def test_destructor(self):
1228 writer = self.MockRawIO()
1229 bufio = self.tp(writer, 8)
1230 bufio.write(b"abc")
1231 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001232 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001233 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001234
1235 def test_truncate(self):
1236 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001237 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001238 bufio = self.tp(raw, 8)
1239 bufio.write(b"abcdef")
1240 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001241 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001242 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001243 self.assertEqual(f.read(), b"abc")
1244
Victor Stinner45df8202010-04-28 22:31:17 +00001245 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001246 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001247 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001248 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001249 # Write out many bytes from many threads and test they were
1250 # all flushed.
1251 N = 1000
1252 contents = bytes(range(256)) * N
1253 sizes = cycle([1, 19])
1254 n = 0
1255 queue = deque()
1256 while n < len(contents):
1257 size = next(sizes)
1258 queue.append(contents[n:n+size])
1259 n += size
1260 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001261 # We use a real file object because it allows us to
1262 # exercise situations where the GIL is released before
1263 # writing the buffer to the raw streams. This is in addition
1264 # to concurrency issues due to switching threads in the middle
1265 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001266 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001267 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001268 errors = []
1269 def f():
1270 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001271 while True:
1272 try:
1273 s = queue.popleft()
1274 except IndexError:
1275 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001276 bufio.write(s)
1277 except Exception as e:
1278 errors.append(e)
1279 raise
1280 threads = [threading.Thread(target=f) for x in range(20)]
1281 for t in threads:
1282 t.start()
1283 time.sleep(0.02) # yield
1284 for t in threads:
1285 t.join()
1286 self.assertFalse(errors,
1287 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001288 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001289 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001290 s = f.read()
1291 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001292 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001293 finally:
1294 support.unlink(support.TESTFN)
1295
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001296 def test_misbehaved_io(self):
1297 rawio = self.MisbehavedRawIO()
1298 bufio = self.tp(rawio, 5)
1299 self.assertRaises(IOError, bufio.seek, 0)
1300 self.assertRaises(IOError, bufio.tell)
1301 self.assertRaises(IOError, bufio.write, b"abcdef")
1302
Florent Xicluna109d5732012-07-07 17:03:22 +02001303 def test_max_buffer_size_removal(self):
1304 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001305 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001306
1307
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001308class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001309 tp = io.BufferedWriter
1310
1311 def test_constructor(self):
1312 BufferedWriterTest.test_constructor(self)
1313 # The allocation can succeed on 32-bit builds, e.g. with more
1314 # than 2GB RAM and a 64-bit kernel.
1315 if sys.maxsize > 0x7FFFFFFF:
1316 rawio = self.MockRawIO()
1317 bufio = self.tp(rawio)
1318 self.assertRaises((OverflowError, MemoryError, ValueError),
1319 bufio.__init__, rawio, sys.maxsize)
1320
1321 def test_initialization(self):
1322 rawio = self.MockRawIO()
1323 bufio = self.tp(rawio)
1324 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1325 self.assertRaises(ValueError, bufio.write, b"def")
1326 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1327 self.assertRaises(ValueError, bufio.write, b"def")
1328 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1329 self.assertRaises(ValueError, bufio.write, b"def")
1330
1331 def test_garbage_collection(self):
1332 # C BufferedWriter objects are collected, and collecting them flushes
1333 # all data to disk.
1334 # The Python version has __del__, so it ends into gc.garbage instead
1335 rawio = self.FileIO(support.TESTFN, "w+b")
1336 f = self.tp(rawio)
1337 f.write(b"123xxx")
1338 f.x = f
1339 wr = weakref.ref(f)
1340 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001341 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001342 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001343 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001344 self.assertEqual(f.read(), b"123xxx")
1345
1346
1347class PyBufferedWriterTest(BufferedWriterTest):
1348 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001349
Guido van Rossum01a27522007-03-07 01:00:12 +00001350class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001351
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001352 def test_constructor(self):
1353 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001354 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001355
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001356 def test_detach(self):
1357 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1358 self.assertRaises(self.UnsupportedOperation, pair.detach)
1359
Florent Xicluna109d5732012-07-07 17:03:22 +02001360 def test_constructor_max_buffer_size_removal(self):
1361 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001362 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001363
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001364 def test_constructor_with_not_readable(self):
1365 class NotReadable(MockRawIO):
1366 def readable(self):
1367 return False
1368
1369 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1370
1371 def test_constructor_with_not_writeable(self):
1372 class NotWriteable(MockRawIO):
1373 def writable(self):
1374 return False
1375
1376 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1377
1378 def test_read(self):
1379 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1380
1381 self.assertEqual(pair.read(3), b"abc")
1382 self.assertEqual(pair.read(1), b"d")
1383 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001384 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1385 self.assertEqual(pair.read(None), b"abc")
1386
1387 def test_readlines(self):
1388 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1389 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1390 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1391 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001392
1393 def test_read1(self):
1394 # .read1() is delegated to the underlying reader object, so this test
1395 # can be shallow.
1396 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1397
1398 self.assertEqual(pair.read1(3), b"abc")
1399
1400 def test_readinto(self):
1401 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1402
1403 data = bytearray(5)
1404 self.assertEqual(pair.readinto(data), 5)
1405 self.assertEqual(data, b"abcde")
1406
1407 def test_write(self):
1408 w = self.MockRawIO()
1409 pair = self.tp(self.MockRawIO(), w)
1410
1411 pair.write(b"abc")
1412 pair.flush()
1413 pair.write(b"def")
1414 pair.flush()
1415 self.assertEqual(w._write_stack, [b"abc", b"def"])
1416
1417 def test_peek(self):
1418 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1419
1420 self.assertTrue(pair.peek(3).startswith(b"abc"))
1421 self.assertEqual(pair.read(3), b"abc")
1422
1423 def test_readable(self):
1424 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1425 self.assertTrue(pair.readable())
1426
1427 def test_writeable(self):
1428 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1429 self.assertTrue(pair.writable())
1430
1431 def test_seekable(self):
1432 # BufferedRWPairs are never seekable, even if their readers and writers
1433 # are.
1434 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1435 self.assertFalse(pair.seekable())
1436
1437 # .flush() is delegated to the underlying writer object and has been
1438 # tested in the test_write method.
1439
1440 def test_close_and_closed(self):
1441 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1442 self.assertFalse(pair.closed)
1443 pair.close()
1444 self.assertTrue(pair.closed)
1445
1446 def test_isatty(self):
1447 class SelectableIsAtty(MockRawIO):
1448 def __init__(self, isatty):
1449 MockRawIO.__init__(self)
1450 self._isatty = isatty
1451
1452 def isatty(self):
1453 return self._isatty
1454
1455 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1456 self.assertFalse(pair.isatty())
1457
1458 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1459 self.assertTrue(pair.isatty())
1460
1461 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1462 self.assertTrue(pair.isatty())
1463
1464 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1465 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001466
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001467class CBufferedRWPairTest(BufferedRWPairTest):
1468 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001469
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001470class PyBufferedRWPairTest(BufferedRWPairTest):
1471 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001472
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001473
1474class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1475 read_mode = "rb+"
1476 write_mode = "wb+"
1477
1478 def test_constructor(self):
1479 BufferedReaderTest.test_constructor(self)
1480 BufferedWriterTest.test_constructor(self)
1481
1482 def test_read_and_write(self):
1483 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001484 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001485
1486 self.assertEqual(b"as", rw.read(2))
1487 rw.write(b"ddd")
1488 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001489 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001490 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001491 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001492
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001493 def test_seek_and_tell(self):
1494 raw = self.BytesIO(b"asdfghjkl")
1495 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001496
Ezio Melottib3aedd42010-11-20 19:04:17 +00001497 self.assertEqual(b"as", rw.read(2))
1498 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001499 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001500 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001501
Antoine Pitroue05565e2011-08-20 14:39:23 +02001502 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001503 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001504 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001505 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001506 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001507 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001508 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001509 self.assertEqual(7, rw.tell())
1510 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001511 rw.flush()
1512 self.assertEqual(b"asdf123fl", raw.getvalue())
1513
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001514 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001515
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001516 def check_flush_and_read(self, read_func):
1517 raw = self.BytesIO(b"abcdefghi")
1518 bufio = self.tp(raw)
1519
Ezio Melottib3aedd42010-11-20 19:04:17 +00001520 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001521 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001522 self.assertEqual(b"ef", read_func(bufio, 2))
1523 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001524 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001525 self.assertEqual(6, bufio.tell())
1526 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001527 raw.seek(0, 0)
1528 raw.write(b"XYZ")
1529 # flush() resets the read buffer
1530 bufio.flush()
1531 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001532 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001533
1534 def test_flush_and_read(self):
1535 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1536
1537 def test_flush_and_readinto(self):
1538 def _readinto(bufio, n=-1):
1539 b = bytearray(n if n >= 0 else 9999)
1540 n = bufio.readinto(b)
1541 return bytes(b[:n])
1542 self.check_flush_and_read(_readinto)
1543
1544 def test_flush_and_peek(self):
1545 def _peek(bufio, n=-1):
1546 # This relies on the fact that the buffer can contain the whole
1547 # raw stream, otherwise peek() can return less.
1548 b = bufio.peek(n)
1549 if n != -1:
1550 b = b[:n]
1551 bufio.seek(len(b), 1)
1552 return b
1553 self.check_flush_and_read(_peek)
1554
1555 def test_flush_and_write(self):
1556 raw = self.BytesIO(b"abcdefghi")
1557 bufio = self.tp(raw)
1558
1559 bufio.write(b"123")
1560 bufio.flush()
1561 bufio.write(b"45")
1562 bufio.flush()
1563 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001564 self.assertEqual(b"12345fghi", raw.getvalue())
1565 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001566
1567 def test_threads(self):
1568 BufferedReaderTest.test_threads(self)
1569 BufferedWriterTest.test_threads(self)
1570
1571 def test_writes_and_peek(self):
1572 def _peek(bufio):
1573 bufio.peek(1)
1574 self.check_writes(_peek)
1575 def _peek(bufio):
1576 pos = bufio.tell()
1577 bufio.seek(-1, 1)
1578 bufio.peek(1)
1579 bufio.seek(pos, 0)
1580 self.check_writes(_peek)
1581
1582 def test_writes_and_reads(self):
1583 def _read(bufio):
1584 bufio.seek(-1, 1)
1585 bufio.read(1)
1586 self.check_writes(_read)
1587
1588 def test_writes_and_read1s(self):
1589 def _read1(bufio):
1590 bufio.seek(-1, 1)
1591 bufio.read1(1)
1592 self.check_writes(_read1)
1593
1594 def test_writes_and_readintos(self):
1595 def _read(bufio):
1596 bufio.seek(-1, 1)
1597 bufio.readinto(bytearray(1))
1598 self.check_writes(_read)
1599
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001600 def test_write_after_readahead(self):
1601 # Issue #6629: writing after the buffer was filled by readahead should
1602 # first rewind the raw stream.
1603 for overwrite_size in [1, 5]:
1604 raw = self.BytesIO(b"A" * 10)
1605 bufio = self.tp(raw, 4)
1606 # Trigger readahead
1607 self.assertEqual(bufio.read(1), b"A")
1608 self.assertEqual(bufio.tell(), 1)
1609 # Overwriting should rewind the raw stream if it needs so
1610 bufio.write(b"B" * overwrite_size)
1611 self.assertEqual(bufio.tell(), overwrite_size + 1)
1612 # If the write size was smaller than the buffer size, flush() and
1613 # check that rewind happens.
1614 bufio.flush()
1615 self.assertEqual(bufio.tell(), overwrite_size + 1)
1616 s = raw.getvalue()
1617 self.assertEqual(s,
1618 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1619
Antoine Pitrou7c404892011-05-13 00:13:33 +02001620 def test_write_rewind_write(self):
1621 # Various combinations of reading / writing / seeking backwards / writing again
1622 def mutate(bufio, pos1, pos2):
1623 assert pos2 >= pos1
1624 # Fill the buffer
1625 bufio.seek(pos1)
1626 bufio.read(pos2 - pos1)
1627 bufio.write(b'\x02')
1628 # This writes earlier than the previous write, but still inside
1629 # the buffer.
1630 bufio.seek(pos1)
1631 bufio.write(b'\x01')
1632
1633 b = b"\x80\x81\x82\x83\x84"
1634 for i in range(0, len(b)):
1635 for j in range(i, len(b)):
1636 raw = self.BytesIO(b)
1637 bufio = self.tp(raw, 100)
1638 mutate(bufio, i, j)
1639 bufio.flush()
1640 expected = bytearray(b)
1641 expected[j] = 2
1642 expected[i] = 1
1643 self.assertEqual(raw.getvalue(), expected,
1644 "failed result for i=%d, j=%d" % (i, j))
1645
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001646 def test_truncate_after_read_or_write(self):
1647 raw = self.BytesIO(b"A" * 10)
1648 bufio = self.tp(raw, 100)
1649 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1650 self.assertEqual(bufio.truncate(), 2)
1651 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1652 self.assertEqual(bufio.truncate(), 4)
1653
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001654 def test_misbehaved_io(self):
1655 BufferedReaderTest.test_misbehaved_io(self)
1656 BufferedWriterTest.test_misbehaved_io(self)
1657
Antoine Pitroue05565e2011-08-20 14:39:23 +02001658 def test_interleaved_read_write(self):
1659 # Test for issue #12213
1660 with self.BytesIO(b'abcdefgh') as raw:
1661 with self.tp(raw, 100) as f:
1662 f.write(b"1")
1663 self.assertEqual(f.read(1), b'b')
1664 f.write(b'2')
1665 self.assertEqual(f.read1(1), b'd')
1666 f.write(b'3')
1667 buf = bytearray(1)
1668 f.readinto(buf)
1669 self.assertEqual(buf, b'f')
1670 f.write(b'4')
1671 self.assertEqual(f.peek(1), b'h')
1672 f.flush()
1673 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1674
1675 with self.BytesIO(b'abc') as raw:
1676 with self.tp(raw, 100) as f:
1677 self.assertEqual(f.read(1), b'a')
1678 f.write(b"2")
1679 self.assertEqual(f.read(1), b'c')
1680 f.flush()
1681 self.assertEqual(raw.getvalue(), b'a2c')
1682
1683 def test_interleaved_readline_write(self):
1684 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1685 with self.tp(raw) as f:
1686 f.write(b'1')
1687 self.assertEqual(f.readline(), b'b\n')
1688 f.write(b'2')
1689 self.assertEqual(f.readline(), b'def\n')
1690 f.write(b'3')
1691 self.assertEqual(f.readline(), b'\n')
1692 f.flush()
1693 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1694
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001695 # You can't construct a BufferedRandom over a non-seekable stream.
1696 test_unseekable = None
1697
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001698class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001699 tp = io.BufferedRandom
1700
1701 def test_constructor(self):
1702 BufferedRandomTest.test_constructor(self)
1703 # The allocation can succeed on 32-bit builds, e.g. with more
1704 # than 2GB RAM and a 64-bit kernel.
1705 if sys.maxsize > 0x7FFFFFFF:
1706 rawio = self.MockRawIO()
1707 bufio = self.tp(rawio)
1708 self.assertRaises((OverflowError, MemoryError, ValueError),
1709 bufio.__init__, rawio, sys.maxsize)
1710
1711 def test_garbage_collection(self):
1712 CBufferedReaderTest.test_garbage_collection(self)
1713 CBufferedWriterTest.test_garbage_collection(self)
1714
1715class PyBufferedRandomTest(BufferedRandomTest):
1716 tp = pyio.BufferedRandom
1717
1718
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001719# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1720# properties:
1721# - A single output character can correspond to many bytes of input.
1722# - The number of input bytes to complete the character can be
1723# undetermined until the last input byte is received.
1724# - The number of input bytes can vary depending on previous input.
1725# - A single input byte can correspond to many characters of output.
1726# - The number of output characters can be undetermined until the
1727# last input byte is received.
1728# - The number of output characters can vary depending on previous input.
1729
1730class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1731 """
1732 For testing seek/tell behavior with a stateful, buffering decoder.
1733
1734 Input is a sequence of words. Words may be fixed-length (length set
1735 by input) or variable-length (period-terminated). In variable-length
1736 mode, extra periods are ignored. Possible words are:
1737 - 'i' followed by a number sets the input length, I (maximum 99).
1738 When I is set to 0, words are space-terminated.
1739 - 'o' followed by a number sets the output length, O (maximum 99).
1740 - Any other word is converted into a word followed by a period on
1741 the output. The output word consists of the input word truncated
1742 or padded out with hyphens to make its length equal to O. If O
1743 is 0, the word is output verbatim without truncating or padding.
1744 I and O are initially set to 1. When I changes, any buffered input is
1745 re-scanned according to the new I. EOF also terminates the last word.
1746 """
1747
1748 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001749 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001750 self.reset()
1751
1752 def __repr__(self):
1753 return '<SID %x>' % id(self)
1754
1755 def reset(self):
1756 self.i = 1
1757 self.o = 1
1758 self.buffer = bytearray()
1759
1760 def getstate(self):
1761 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1762 return bytes(self.buffer), i*100 + o
1763
1764 def setstate(self, state):
1765 buffer, io = state
1766 self.buffer = bytearray(buffer)
1767 i, o = divmod(io, 100)
1768 self.i, self.o = i ^ 1, o ^ 1
1769
1770 def decode(self, input, final=False):
1771 output = ''
1772 for b in input:
1773 if self.i == 0: # variable-length, terminated with period
1774 if b == ord('.'):
1775 if self.buffer:
1776 output += self.process_word()
1777 else:
1778 self.buffer.append(b)
1779 else: # fixed-length, terminate after self.i bytes
1780 self.buffer.append(b)
1781 if len(self.buffer) == self.i:
1782 output += self.process_word()
1783 if final and self.buffer: # EOF terminates the last word
1784 output += self.process_word()
1785 return output
1786
1787 def process_word(self):
1788 output = ''
1789 if self.buffer[0] == ord('i'):
1790 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1791 elif self.buffer[0] == ord('o'):
1792 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1793 else:
1794 output = self.buffer.decode('ascii')
1795 if len(output) < self.o:
1796 output += '-'*self.o # pad out with hyphens
1797 if self.o:
1798 output = output[:self.o] # truncate to output length
1799 output += '.'
1800 self.buffer = bytearray()
1801 return output
1802
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001803 codecEnabled = False
1804
1805 @classmethod
1806 def lookupTestDecoder(cls, name):
1807 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001808 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001809 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001810 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001811 incrementalencoder=None,
1812 streamreader=None, streamwriter=None,
1813 incrementaldecoder=cls)
1814
1815# Register the previous decoder for testing.
1816# Disabled by default, tests will enable it.
1817codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1818
1819
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001820class StatefulIncrementalDecoderTest(unittest.TestCase):
1821 """
1822 Make sure the StatefulIncrementalDecoder actually works.
1823 """
1824
1825 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001826 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001827 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001828 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001829 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001830 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001831 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001832 # I=0, O=6 (variable-length input, fixed-length output)
1833 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1834 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001835 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001836 # I=6, O=3 (fixed-length input > fixed-length output)
1837 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1838 # I=0, then 3; O=29, then 15 (with longer output)
1839 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1840 'a----------------------------.' +
1841 'b----------------------------.' +
1842 'cde--------------------------.' +
1843 'abcdefghijabcde.' +
1844 'a.b------------.' +
1845 '.c.------------.' +
1846 'd.e------------.' +
1847 'k--------------.' +
1848 'l--------------.' +
1849 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001850 ]
1851
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001852 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001853 # Try a few one-shot test cases.
1854 for input, eof, output in self.test_cases:
1855 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001856 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001857
1858 # Also test an unfinished decode, followed by forcing EOF.
1859 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001860 self.assertEqual(d.decode(b'oiabcd'), '')
1861 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001862
1863class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001864
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001865 def setUp(self):
1866 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1867 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001868 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001869
Guido van Rossumd0712812007-04-11 16:32:43 +00001870 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001871 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001872
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001873 def test_constructor(self):
1874 r = self.BytesIO(b"\xc3\xa9\n\n")
1875 b = self.BufferedReader(r, 1000)
1876 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001877 t.__init__(b, encoding="latin-1", newline="\r\n")
1878 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001879 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001880 t.__init__(b, encoding="utf-8", line_buffering=True)
1881 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001882 self.assertEqual(t.line_buffering, True)
1883 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001884 self.assertRaises(TypeError, t.__init__, b, newline=42)
1885 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1886
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001887 def test_detach(self):
1888 r = self.BytesIO()
1889 b = self.BufferedWriter(r)
1890 t = self.TextIOWrapper(b)
1891 self.assertIs(t.detach(), b)
1892
1893 t = self.TextIOWrapper(b, encoding="ascii")
1894 t.write("howdy")
1895 self.assertFalse(r.getvalue())
1896 t.detach()
1897 self.assertEqual(r.getvalue(), b"howdy")
1898 self.assertRaises(ValueError, t.detach)
1899
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001900 def test_repr(self):
1901 raw = self.BytesIO("hello".encode("utf-8"))
1902 b = self.BufferedReader(raw)
1903 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001904 modname = self.TextIOWrapper.__module__
1905 self.assertEqual(repr(t),
1906 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1907 raw.name = "dummy"
1908 self.assertEqual(repr(t),
1909 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001910 t.mode = "r"
1911 self.assertEqual(repr(t),
1912 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001913 raw.name = b"dummy"
1914 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001915 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001916
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001917 def test_line_buffering(self):
1918 r = self.BytesIO()
1919 b = self.BufferedWriter(r, 1000)
1920 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001921 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001922 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001923 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001924 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001925 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001926 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001927
Victor Stinnerf86a5e82012-06-05 13:43:22 +02001928 def test_default_encoding(self):
1929 old_environ = dict(os.environ)
1930 try:
1931 # try to get a user preferred encoding different than the current
1932 # locale encoding to check that TextIOWrapper() uses the current
1933 # locale encoding and not the user preferred encoding
1934 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
1935 if key in os.environ:
1936 del os.environ[key]
1937
1938 current_locale_encoding = locale.getpreferredencoding(False)
1939 b = self.BytesIO()
1940 t = self.TextIOWrapper(b)
1941 self.assertEqual(t.encoding, current_locale_encoding)
1942 finally:
1943 os.environ.clear()
1944 os.environ.update(old_environ)
1945
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001946 def test_encoding(self):
1947 # Check the encoding attribute is always set, and valid
1948 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001949 t = self.TextIOWrapper(b, encoding="utf-8")
1950 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001951 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001952 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001953 codecs.lookup(t.encoding)
1954
1955 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001956 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001957 b = self.BytesIO(b"abc\n\xff\n")
1958 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001959 self.assertRaises(UnicodeError, t.read)
1960 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001961 b = self.BytesIO(b"abc\n\xff\n")
1962 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001963 self.assertRaises(UnicodeError, t.read)
1964 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001965 b = self.BytesIO(b"abc\n\xff\n")
1966 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001967 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001968 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001969 b = self.BytesIO(b"abc\n\xff\n")
1970 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001971 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001972
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001973 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001974 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001975 b = self.BytesIO()
1976 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001977 self.assertRaises(UnicodeError, t.write, "\xff")
1978 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001979 b = self.BytesIO()
1980 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001981 self.assertRaises(UnicodeError, t.write, "\xff")
1982 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001983 b = self.BytesIO()
1984 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001985 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001986 t.write("abc\xffdef\n")
1987 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001988 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001989 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001990 b = self.BytesIO()
1991 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001992 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001993 t.write("abc\xffdef\n")
1994 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001995 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001996
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001997 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001998 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1999
2000 tests = [
2001 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002002 [ '', input_lines ],
2003 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2004 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2005 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002006 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002007 encodings = (
2008 'utf-8', 'latin-1',
2009 'utf-16', 'utf-16-le', 'utf-16-be',
2010 'utf-32', 'utf-32-le', 'utf-32-be',
2011 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002012
Guido van Rossum8358db22007-08-18 21:39:55 +00002013 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002014 # character in TextIOWrapper._pending_line.
2015 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002016 # XXX: str.encode() should return bytes
2017 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002018 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002019 for bufsize in range(1, 10):
2020 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002021 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2022 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002023 encoding=encoding)
2024 if do_reads:
2025 got_lines = []
2026 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002027 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002028 if c2 == '':
2029 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002030 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002031 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002032 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002033 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002034
2035 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002036 self.assertEqual(got_line, exp_line)
2037 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002038
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002039 def test_newlines_input(self):
2040 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002041 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2042 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002043 (None, normalized.decode("ascii").splitlines(keepends=True)),
2044 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002045 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2046 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2047 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002048 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002049 buf = self.BytesIO(testdata)
2050 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002051 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002052 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002053 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002054
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002055 def test_newlines_output(self):
2056 testdict = {
2057 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2058 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2059 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2060 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2061 }
2062 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2063 for newline, expected in tests:
2064 buf = self.BytesIO()
2065 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2066 txt.write("AAA\nB")
2067 txt.write("BB\nCCC\n")
2068 txt.write("X\rY\r\nZ")
2069 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002070 self.assertEqual(buf.closed, False)
2071 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002072
2073 def test_destructor(self):
2074 l = []
2075 base = self.BytesIO
2076 class MyBytesIO(base):
2077 def close(self):
2078 l.append(self.getvalue())
2079 base.close(self)
2080 b = MyBytesIO()
2081 t = self.TextIOWrapper(b, encoding="ascii")
2082 t.write("abc")
2083 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002084 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002085 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002086
2087 def test_override_destructor(self):
2088 record = []
2089 class MyTextIO(self.TextIOWrapper):
2090 def __del__(self):
2091 record.append(1)
2092 try:
2093 f = super().__del__
2094 except AttributeError:
2095 pass
2096 else:
2097 f()
2098 def close(self):
2099 record.append(2)
2100 super().close()
2101 def flush(self):
2102 record.append(3)
2103 super().flush()
2104 b = self.BytesIO()
2105 t = MyTextIO(b, encoding="ascii")
2106 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002107 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002108 self.assertEqual(record, [1, 2, 3])
2109
2110 def test_error_through_destructor(self):
2111 # Test that the exception state is not modified by a destructor,
2112 # even if close() fails.
2113 rawio = self.CloseFailureIO()
2114 def f():
2115 self.TextIOWrapper(rawio).xyzzy
2116 with support.captured_output("stderr") as s:
2117 self.assertRaises(AttributeError, f)
2118 s = s.getvalue().strip()
2119 if s:
2120 # The destructor *may* have printed an unraisable error, check it
2121 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002122 self.assertTrue(s.startswith("Exception IOError: "), s)
2123 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002124
Guido van Rossum9b76da62007-04-11 01:09:03 +00002125 # Systematic tests of the text I/O API
2126
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002127 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002128 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002129 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002130 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002131 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002132 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002133 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002134 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002135 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002136 self.assertEqual(f.tell(), 0)
2137 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002138 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002139 self.assertEqual(f.seek(0), 0)
2140 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002141 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002142 self.assertEqual(f.read(2), "ab")
2143 self.assertEqual(f.read(1), "c")
2144 self.assertEqual(f.read(1), "")
2145 self.assertEqual(f.read(), "")
2146 self.assertEqual(f.tell(), cookie)
2147 self.assertEqual(f.seek(0), 0)
2148 self.assertEqual(f.seek(0, 2), cookie)
2149 self.assertEqual(f.write("def"), 3)
2150 self.assertEqual(f.seek(cookie), cookie)
2151 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002152 if enc.startswith("utf"):
2153 self.multi_line_test(f, enc)
2154 f.close()
2155
2156 def multi_line_test(self, f, enc):
2157 f.seek(0)
2158 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002159 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002160 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002161 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002162 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002163 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002164 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002165 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002166 wlines.append((f.tell(), line))
2167 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002168 f.seek(0)
2169 rlines = []
2170 while True:
2171 pos = f.tell()
2172 line = f.readline()
2173 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002174 break
2175 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002176 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002177
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002178 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002179 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002180 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002181 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002182 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002183 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002184 p2 = f.tell()
2185 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002186 self.assertEqual(f.tell(), p0)
2187 self.assertEqual(f.readline(), "\xff\n")
2188 self.assertEqual(f.tell(), p1)
2189 self.assertEqual(f.readline(), "\xff\n")
2190 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002191 f.seek(0)
2192 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002193 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002194 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002195 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002196 f.close()
2197
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002198 def test_seeking(self):
2199 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002200 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002201 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002202 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002203 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002204 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002205 suffix = bytes(u_suffix.encode("utf-8"))
2206 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002207 with self.open(support.TESTFN, "wb") as f:
2208 f.write(line*2)
2209 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2210 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002211 self.assertEqual(s, str(prefix, "ascii"))
2212 self.assertEqual(f.tell(), prefix_size)
2213 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002214
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002215 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002216 # Regression test for a specific bug
2217 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002218 with self.open(support.TESTFN, "wb") as f:
2219 f.write(data)
2220 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2221 f._CHUNK_SIZE # Just test that it exists
2222 f._CHUNK_SIZE = 2
2223 f.readline()
2224 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002225
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002226 def test_seek_and_tell(self):
2227 #Test seek/tell using the StatefulIncrementalDecoder.
2228 # Make test faster by doing smaller seeks
2229 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002230
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002231 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002232 """Tell/seek to various points within a data stream and ensure
2233 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002234 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002235 f.write(data)
2236 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002237 f = self.open(support.TESTFN, encoding='test_decoder')
2238 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002239 decoded = f.read()
2240 f.close()
2241
Neal Norwitze2b07052008-03-18 19:52:05 +00002242 for i in range(min_pos, len(decoded) + 1): # seek positions
2243 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002244 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002245 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002246 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002247 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002248 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002249 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002250 f.close()
2251
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002252 # Enable the test decoder.
2253 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002254
2255 # Run the tests.
2256 try:
2257 # Try each test case.
2258 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002259 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002260
2261 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002262 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2263 offset = CHUNK_SIZE - len(input)//2
2264 prefix = b'.'*offset
2265 # Don't bother seeking into the prefix (takes too long).
2266 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002267 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002268
2269 # Ensure our test decoder won't interfere with subsequent tests.
2270 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002271 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002272
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002273 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002274 data = "1234567890"
2275 tests = ("utf-16",
2276 "utf-16-le",
2277 "utf-16-be",
2278 "utf-32",
2279 "utf-32-le",
2280 "utf-32-be")
2281 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002282 buf = self.BytesIO()
2283 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002284 # Check if the BOM is written only once (see issue1753).
2285 f.write(data)
2286 f.write(data)
2287 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002288 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002289 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002290 self.assertEqual(f.read(), data * 2)
2291 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002292
Benjamin Petersona1b49012009-03-31 23:11:32 +00002293 def test_unreadable(self):
2294 class UnReadable(self.BytesIO):
2295 def readable(self):
2296 return False
2297 txt = self.TextIOWrapper(UnReadable())
2298 self.assertRaises(IOError, txt.read)
2299
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002300 def test_read_one_by_one(self):
2301 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002302 reads = ""
2303 while True:
2304 c = txt.read(1)
2305 if not c:
2306 break
2307 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002308 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002309
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002310 def test_readlines(self):
2311 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2312 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2313 txt.seek(0)
2314 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2315 txt.seek(0)
2316 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2317
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002318 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002319 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002320 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002321 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002322 reads = ""
2323 while True:
2324 c = txt.read(128)
2325 if not c:
2326 break
2327 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002328 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002329
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002330 def test_writelines(self):
2331 l = ['ab', 'cd', 'ef']
2332 buf = self.BytesIO()
2333 txt = self.TextIOWrapper(buf)
2334 txt.writelines(l)
2335 txt.flush()
2336 self.assertEqual(buf.getvalue(), b'abcdef')
2337
2338 def test_writelines_userlist(self):
2339 l = UserList(['ab', 'cd', 'ef'])
2340 buf = self.BytesIO()
2341 txt = self.TextIOWrapper(buf)
2342 txt.writelines(l)
2343 txt.flush()
2344 self.assertEqual(buf.getvalue(), b'abcdef')
2345
2346 def test_writelines_error(self):
2347 txt = self.TextIOWrapper(self.BytesIO())
2348 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2349 self.assertRaises(TypeError, txt.writelines, None)
2350 self.assertRaises(TypeError, txt.writelines, b'abc')
2351
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002352 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002353 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002354
2355 # read one char at a time
2356 reads = ""
2357 while True:
2358 c = txt.read(1)
2359 if not c:
2360 break
2361 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002362 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002363
2364 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002365 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002366 txt._CHUNK_SIZE = 4
2367
2368 reads = ""
2369 while True:
2370 c = txt.read(4)
2371 if not c:
2372 break
2373 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002374 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002375
2376 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002377 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002378 txt._CHUNK_SIZE = 4
2379
2380 reads = txt.read(4)
2381 reads += txt.read(4)
2382 reads += txt.readline()
2383 reads += txt.readline()
2384 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002385 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002386
2387 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002388 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002389 txt._CHUNK_SIZE = 4
2390
2391 reads = txt.read(4)
2392 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002393 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002394
2395 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002396 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002397 txt._CHUNK_SIZE = 4
2398
2399 reads = txt.read(4)
2400 pos = txt.tell()
2401 txt.seek(0)
2402 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002403 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002404
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002405 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002406 buffer = self.BytesIO(self.testdata)
2407 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002408
2409 self.assertEqual(buffer.seekable(), txt.seekable())
2410
Antoine Pitroue4501852009-05-14 18:55:55 +00002411 def test_append_bom(self):
2412 # The BOM is not written again when appending to a non-empty file
2413 filename = support.TESTFN
2414 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2415 with self.open(filename, 'w', encoding=charset) as f:
2416 f.write('aaa')
2417 pos = f.tell()
2418 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002419 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002420
2421 with self.open(filename, 'a', encoding=charset) as f:
2422 f.write('xxx')
2423 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002424 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002425
2426 def test_seek_bom(self):
2427 # Same test, but when seeking manually
2428 filename = support.TESTFN
2429 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2430 with self.open(filename, 'w', encoding=charset) as f:
2431 f.write('aaa')
2432 pos = f.tell()
2433 with self.open(filename, 'r+', encoding=charset) as f:
2434 f.seek(pos)
2435 f.write('zzz')
2436 f.seek(0)
2437 f.write('bbb')
2438 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002439 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002440
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002441 def test_errors_property(self):
2442 with self.open(support.TESTFN, "w") as f:
2443 self.assertEqual(f.errors, "strict")
2444 with self.open(support.TESTFN, "w", errors="replace") as f:
2445 self.assertEqual(f.errors, "replace")
2446
Brett Cannon31f59292011-02-21 19:29:56 +00002447 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002448 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002449 def test_threads_write(self):
2450 # Issue6750: concurrent writes could duplicate data
2451 event = threading.Event()
2452 with self.open(support.TESTFN, "w", buffering=1) as f:
2453 def run(n):
2454 text = "Thread%03d\n" % n
2455 event.wait()
2456 f.write(text)
2457 threads = [threading.Thread(target=lambda n=x: run(n))
2458 for x in range(20)]
2459 for t in threads:
2460 t.start()
2461 time.sleep(0.02)
2462 event.set()
2463 for t in threads:
2464 t.join()
2465 with self.open(support.TESTFN) as f:
2466 content = f.read()
2467 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002468 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002469
Antoine Pitrou6be88762010-05-03 16:48:20 +00002470 def test_flush_error_on_close(self):
2471 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2472 def bad_flush():
2473 raise IOError()
2474 txt.flush = bad_flush
2475 self.assertRaises(IOError, txt.close) # exception not swallowed
2476
2477 def test_multi_close(self):
2478 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2479 txt.close()
2480 txt.close()
2481 txt.close()
2482 self.assertRaises(ValueError, txt.flush)
2483
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002484 def test_unseekable(self):
2485 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2486 self.assertRaises(self.UnsupportedOperation, txt.tell)
2487 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2488
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002489 def test_readonly_attributes(self):
2490 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2491 buf = self.BytesIO(self.testdata)
2492 with self.assertRaises(AttributeError):
2493 txt.buffer = buf
2494
Antoine Pitroue96ec682011-07-23 21:46:35 +02002495 def test_rawio(self):
2496 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2497 # that subprocess.Popen() can have the required unbuffered
2498 # semantics with universal_newlines=True.
2499 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2500 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2501 # Reads
2502 self.assertEqual(txt.read(4), 'abcd')
2503 self.assertEqual(txt.readline(), 'efghi\n')
2504 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2505
2506 def test_rawio_write_through(self):
2507 # Issue #12591: with write_through=True, writes don't need a flush
2508 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2509 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2510 write_through=True)
2511 txt.write('1')
2512 txt.write('23\n4')
2513 txt.write('5')
2514 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2515
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002516class CTextIOWrapperTest(TextIOWrapperTest):
2517
2518 def test_initialization(self):
2519 r = self.BytesIO(b"\xc3\xa9\n\n")
2520 b = self.BufferedReader(r, 1000)
2521 t = self.TextIOWrapper(b)
2522 self.assertRaises(TypeError, t.__init__, b, newline=42)
2523 self.assertRaises(ValueError, t.read)
2524 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2525 self.assertRaises(ValueError, t.read)
2526
2527 def test_garbage_collection(self):
2528 # C TextIOWrapper objects are collected, and collecting them flushes
2529 # all data to disk.
2530 # The Python version has __del__, so it ends in gc.garbage instead.
2531 rawio = io.FileIO(support.TESTFN, "wb")
2532 b = self.BufferedWriter(rawio)
2533 t = self.TextIOWrapper(b, encoding="ascii")
2534 t.write("456def")
2535 t.x = t
2536 wr = weakref.ref(t)
2537 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002538 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002539 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002540 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002541 self.assertEqual(f.read(), b"456def")
2542
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002543 def test_rwpair_cleared_before_textio(self):
2544 # Issue 13070: TextIOWrapper's finalization would crash when called
2545 # after the reference to the underlying BufferedRWPair's writer got
2546 # cleared by the GC.
2547 for i in range(1000):
2548 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2549 t1 = self.TextIOWrapper(b1, encoding="ascii")
2550 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2551 t2 = self.TextIOWrapper(b2, encoding="ascii")
2552 # circular references
2553 t1.buddy = t2
2554 t2.buddy = t1
2555 support.gc_collect()
2556
2557
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002558class PyTextIOWrapperTest(TextIOWrapperTest):
2559 pass
2560
2561
2562class IncrementalNewlineDecoderTest(unittest.TestCase):
2563
2564 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002565 # UTF-8 specific tests for a newline decoder
2566 def _check_decode(b, s, **kwargs):
2567 # We exercise getstate() / setstate() as well as decode()
2568 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002569 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002570 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002571 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002572
Antoine Pitrou180a3362008-12-14 16:36:46 +00002573 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002574
Antoine Pitrou180a3362008-12-14 16:36:46 +00002575 _check_decode(b'\xe8', "")
2576 _check_decode(b'\xa2', "")
2577 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002578
Antoine Pitrou180a3362008-12-14 16:36:46 +00002579 _check_decode(b'\xe8', "")
2580 _check_decode(b'\xa2', "")
2581 _check_decode(b'\x88', "\u8888")
2582
2583 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002584 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2585
Antoine Pitrou180a3362008-12-14 16:36:46 +00002586 decoder.reset()
2587 _check_decode(b'\n', "\n")
2588 _check_decode(b'\r', "")
2589 _check_decode(b'', "\n", final=True)
2590 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002591
Antoine Pitrou180a3362008-12-14 16:36:46 +00002592 _check_decode(b'\r', "")
2593 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002594
Antoine Pitrou180a3362008-12-14 16:36:46 +00002595 _check_decode(b'\r\r\n', "\n\n")
2596 _check_decode(b'\r', "")
2597 _check_decode(b'\r', "\n")
2598 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002599
Antoine Pitrou180a3362008-12-14 16:36:46 +00002600 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2601 _check_decode(b'\xe8\xa2\x88', "\u8888")
2602 _check_decode(b'\n', "\n")
2603 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2604 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002605
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002606 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002607 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002608 if encoding is not None:
2609 encoder = codecs.getincrementalencoder(encoding)()
2610 def _decode_bytewise(s):
2611 # Decode one byte at a time
2612 for b in encoder.encode(s):
2613 result.append(decoder.decode(bytes([b])))
2614 else:
2615 encoder = None
2616 def _decode_bytewise(s):
2617 # Decode one char at a time
2618 for c in s:
2619 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002620 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002621 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002622 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002623 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002624 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002625 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002626 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002627 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002628 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002629 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002630 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002631 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002632 input = "abc"
2633 if encoder is not None:
2634 encoder.reset()
2635 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002636 self.assertEqual(decoder.decode(input), "abc")
2637 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002638
2639 def test_newline_decoder(self):
2640 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002641 # None meaning the IncrementalNewlineDecoder takes unicode input
2642 # rather than bytes input
2643 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002644 'utf-16', 'utf-16-le', 'utf-16-be',
2645 'utf-32', 'utf-32-le', 'utf-32-be',
2646 )
2647 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002648 decoder = enc and codecs.getincrementaldecoder(enc)()
2649 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2650 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002651 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002652 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2653 self.check_newline_decoding_utf8(decoder)
2654
Antoine Pitrou66913e22009-03-06 23:40:56 +00002655 def test_newline_bytes(self):
2656 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2657 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002658 self.assertEqual(dec.newlines, None)
2659 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2660 self.assertEqual(dec.newlines, None)
2661 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2662 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002663 dec = self.IncrementalNewlineDecoder(None, translate=False)
2664 _check(dec)
2665 dec = self.IncrementalNewlineDecoder(None, translate=True)
2666 _check(dec)
2667
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002668class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2669 pass
2670
2671class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2672 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002673
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002674
Guido van Rossum01a27522007-03-07 01:00:12 +00002675# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002676
Guido van Rossum5abbf752007-08-27 17:39:33 +00002677class MiscIOTest(unittest.TestCase):
2678
Barry Warsaw40e82462008-11-20 20:14:50 +00002679 def tearDown(self):
2680 support.unlink(support.TESTFN)
2681
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002682 def test___all__(self):
2683 for name in self.io.__all__:
2684 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002685 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002686 if name == "open":
2687 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002688 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002689 self.assertTrue(issubclass(obj, Exception), name)
2690 elif not name.startswith("SEEK_"):
2691 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002692
Barry Warsaw40e82462008-11-20 20:14:50 +00002693 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002694 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002695 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002696 f.close()
2697
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002698 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002699 self.assertEqual(f.name, support.TESTFN)
2700 self.assertEqual(f.buffer.name, support.TESTFN)
2701 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2702 self.assertEqual(f.mode, "U")
2703 self.assertEqual(f.buffer.mode, "rb")
2704 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002705 f.close()
2706
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002707 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002708 self.assertEqual(f.mode, "w+")
2709 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2710 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002711
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002712 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002713 self.assertEqual(g.mode, "wb")
2714 self.assertEqual(g.raw.mode, "wb")
2715 self.assertEqual(g.name, f.fileno())
2716 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002717 f.close()
2718 g.close()
2719
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002720 def test_io_after_close(self):
2721 for kwargs in [
2722 {"mode": "w"},
2723 {"mode": "wb"},
2724 {"mode": "w", "buffering": 1},
2725 {"mode": "w", "buffering": 2},
2726 {"mode": "wb", "buffering": 0},
2727 {"mode": "r"},
2728 {"mode": "rb"},
2729 {"mode": "r", "buffering": 1},
2730 {"mode": "r", "buffering": 2},
2731 {"mode": "rb", "buffering": 0},
2732 {"mode": "w+"},
2733 {"mode": "w+b"},
2734 {"mode": "w+", "buffering": 1},
2735 {"mode": "w+", "buffering": 2},
2736 {"mode": "w+b", "buffering": 0},
2737 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002738 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002739 f.close()
2740 self.assertRaises(ValueError, f.flush)
2741 self.assertRaises(ValueError, f.fileno)
2742 self.assertRaises(ValueError, f.isatty)
2743 self.assertRaises(ValueError, f.__iter__)
2744 if hasattr(f, "peek"):
2745 self.assertRaises(ValueError, f.peek, 1)
2746 self.assertRaises(ValueError, f.read)
2747 if hasattr(f, "read1"):
2748 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002749 if hasattr(f, "readall"):
2750 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002751 if hasattr(f, "readinto"):
2752 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2753 self.assertRaises(ValueError, f.readline)
2754 self.assertRaises(ValueError, f.readlines)
2755 self.assertRaises(ValueError, f.seek, 0)
2756 self.assertRaises(ValueError, f.tell)
2757 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002758 self.assertRaises(ValueError, f.write,
2759 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002760 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002761 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002762
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002763 def test_blockingioerror(self):
2764 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002765 class C(str):
2766 pass
2767 c = C("")
2768 b = self.BlockingIOError(1, c)
2769 c.b = b
2770 b.c = c
2771 wr = weakref.ref(c)
2772 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002773 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002774 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002775
2776 def test_abcs(self):
2777 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002778 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2779 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2780 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2781 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002782
2783 def _check_abc_inheritance(self, abcmodule):
2784 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002785 self.assertIsInstance(f, abcmodule.IOBase)
2786 self.assertIsInstance(f, abcmodule.RawIOBase)
2787 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2788 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002789 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002790 self.assertIsInstance(f, abcmodule.IOBase)
2791 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2792 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2793 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002794 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002795 self.assertIsInstance(f, abcmodule.IOBase)
2796 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2797 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2798 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002799
2800 def test_abc_inheritance(self):
2801 # Test implementations inherit from their respective ABCs
2802 self._check_abc_inheritance(self)
2803
2804 def test_abc_inheritance_official(self):
2805 # Test implementations inherit from the official ABCs of the
2806 # baseline "io" module.
2807 self._check_abc_inheritance(io)
2808
Antoine Pitroue033e062010-10-29 10:38:18 +00002809 def _check_warn_on_dealloc(self, *args, **kwargs):
2810 f = open(*args, **kwargs)
2811 r = repr(f)
2812 with self.assertWarns(ResourceWarning) as cm:
2813 f = None
2814 support.gc_collect()
2815 self.assertIn(r, str(cm.warning.args[0]))
2816
2817 def test_warn_on_dealloc(self):
2818 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2819 self._check_warn_on_dealloc(support.TESTFN, "wb")
2820 self._check_warn_on_dealloc(support.TESTFN, "w")
2821
2822 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2823 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002824 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002825 for fd in fds:
2826 try:
2827 os.close(fd)
2828 except EnvironmentError as e:
2829 if e.errno != errno.EBADF:
2830 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002831 self.addCleanup(cleanup_fds)
2832 r, w = os.pipe()
2833 fds += r, w
2834 self._check_warn_on_dealloc(r, *args, **kwargs)
2835 # When using closefd=False, there's no warning
2836 r, w = os.pipe()
2837 fds += r, w
2838 with warnings.catch_warnings(record=True) as recorded:
2839 open(r, *args, closefd=False, **kwargs)
2840 support.gc_collect()
2841 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002842
2843 def test_warn_on_dealloc_fd(self):
2844 self._check_warn_on_dealloc_fd("rb", buffering=0)
2845 self._check_warn_on_dealloc_fd("rb")
2846 self._check_warn_on_dealloc_fd("r")
2847
2848
Antoine Pitrou243757e2010-11-05 21:15:39 +00002849 def test_pickling(self):
2850 # Pickling file objects is forbidden
2851 for kwargs in [
2852 {"mode": "w"},
2853 {"mode": "wb"},
2854 {"mode": "wb", "buffering": 0},
2855 {"mode": "r"},
2856 {"mode": "rb"},
2857 {"mode": "rb", "buffering": 0},
2858 {"mode": "w+"},
2859 {"mode": "w+b"},
2860 {"mode": "w+b", "buffering": 0},
2861 ]:
2862 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2863 with self.open(support.TESTFN, **kwargs) as f:
2864 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2865
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002866 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2867 def test_nonblock_pipe_write_bigbuf(self):
2868 self._test_nonblock_pipe_write(16*1024)
2869
2870 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2871 def test_nonblock_pipe_write_smallbuf(self):
2872 self._test_nonblock_pipe_write(1024)
2873
2874 def _set_non_blocking(self, fd):
2875 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2876 self.assertNotEqual(flags, -1)
2877 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2878 self.assertEqual(res, 0)
2879
2880 def _test_nonblock_pipe_write(self, bufsize):
2881 sent = []
2882 received = []
2883 r, w = os.pipe()
2884 self._set_non_blocking(r)
2885 self._set_non_blocking(w)
2886
2887 # To exercise all code paths in the C implementation we need
2888 # to play with buffer sizes. For instance, if we choose a
2889 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2890 # then we will never get a partial write of the buffer.
2891 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2892 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2893
2894 with rf, wf:
2895 for N in 9999, 73, 7574:
2896 try:
2897 i = 0
2898 while True:
2899 msg = bytes([i % 26 + 97]) * N
2900 sent.append(msg)
2901 wf.write(msg)
2902 i += 1
2903
2904 except self.BlockingIOError as e:
2905 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002906 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002907 sent[-1] = sent[-1][:e.characters_written]
2908 received.append(rf.read())
2909 msg = b'BLOCKED'
2910 wf.write(msg)
2911 sent.append(msg)
2912
2913 while True:
2914 try:
2915 wf.flush()
2916 break
2917 except self.BlockingIOError as e:
2918 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002919 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002920 self.assertEqual(e.characters_written, 0)
2921 received.append(rf.read())
2922
2923 received += iter(rf.read, None)
2924
2925 sent, received = b''.join(sent), b''.join(received)
2926 self.assertTrue(sent == received)
2927 self.assertTrue(wf.closed)
2928 self.assertTrue(rf.closed)
2929
Charles-François Natalidc3044c2012-01-09 22:40:02 +01002930 def test_create_fail(self):
2931 # 'x' mode fails if file is existing
2932 with self.open(support.TESTFN, 'w'):
2933 pass
2934 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
2935
2936 def test_create_writes(self):
2937 # 'x' mode opens for writing
2938 with self.open(support.TESTFN, 'xb') as f:
2939 f.write(b"spam")
2940 with self.open(support.TESTFN, 'rb') as f:
2941 self.assertEqual(b"spam", f.read())
2942
Christian Heimes7b648752012-09-10 14:48:43 +02002943 def test_open_allargs(self):
2944 # there used to be a buffer overflow in the parser for rawmode
2945 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
2946
2947
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002948class CMiscIOTest(MiscIOTest):
2949 io = io
2950
2951class PyMiscIOTest(MiscIOTest):
2952 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002953
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002954
2955@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2956class SignalsTest(unittest.TestCase):
2957
2958 def setUp(self):
2959 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2960
2961 def tearDown(self):
2962 signal.signal(signal.SIGALRM, self.oldalrm)
2963
2964 def alarm_interrupt(self, sig, frame):
2965 1/0
2966
2967 @unittest.skipUnless(threading, 'Threading required for this test.')
2968 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2969 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00002970 invokes the signal handler, and bubbles up the exception raised
2971 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002972 read_results = []
2973 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02002974 if hasattr(signal, 'pthread_sigmask'):
2975 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002976 s = os.read(r, 1)
2977 read_results.append(s)
2978 t = threading.Thread(target=_read)
2979 t.daemon = True
2980 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002981 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002982 try:
2983 wio = self.io.open(w, **fdopen_kwargs)
2984 t.start()
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07002985 signal.alarm(1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002986 # Fill the pipe enough that the write will be blocking.
2987 # It will be interrupted by the timer armed above. Since the
2988 # other thread has read one byte, the low-level write will
2989 # return with a successful (partial) result rather than an EINTR.
2990 # The buffered IO layer must check for pending signal
2991 # handlers, which in this case will invoke alarm_interrupt().
2992 self.assertRaises(ZeroDivisionError,
Charles-François Natali2d517212011-05-29 16:36:44 +02002993 wio.write, item * (support.PIPE_MAX_SIZE // len(item)))
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002994 t.join()
2995 # We got one byte, get another one and check that it isn't a
2996 # repeat of the first one.
2997 read_results.append(os.read(r, 1))
2998 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2999 finally:
3000 os.close(w)
3001 os.close(r)
3002 # This is deliberate. If we didn't close the file descriptor
3003 # before closing wio, wio would try to flush its internal
3004 # buffer, and block again.
3005 try:
3006 wio.close()
3007 except IOError as e:
3008 if e.errno != errno.EBADF:
3009 raise
3010
3011 def test_interrupted_write_unbuffered(self):
3012 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3013
3014 def test_interrupted_write_buffered(self):
3015 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3016
3017 def test_interrupted_write_text(self):
3018 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3019
Brett Cannon31f59292011-02-21 19:29:56 +00003020 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003021 def check_reentrant_write(self, data, **fdopen_kwargs):
3022 def on_alarm(*args):
3023 # Will be called reentrantly from the same thread
3024 wio.write(data)
3025 1/0
3026 signal.signal(signal.SIGALRM, on_alarm)
3027 r, w = os.pipe()
3028 wio = self.io.open(w, **fdopen_kwargs)
3029 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003030 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003031 # Either the reentrant call to wio.write() fails with RuntimeError,
3032 # or the signal handler raises ZeroDivisionError.
3033 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3034 while 1:
3035 for i in range(100):
3036 wio.write(data)
3037 wio.flush()
3038 # Make sure the buffer doesn't fill up and block further writes
3039 os.read(r, len(data) * 100)
3040 exc = cm.exception
3041 if isinstance(exc, RuntimeError):
3042 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3043 finally:
3044 wio.close()
3045 os.close(r)
3046
3047 def test_reentrant_write_buffered(self):
3048 self.check_reentrant_write(b"xy", mode="wb")
3049
3050 def test_reentrant_write_text(self):
3051 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3052
Antoine Pitrou707ce822011-02-25 21:24:11 +00003053 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3054 """Check that a buffered read, when it gets interrupted (either
3055 returning a partial result or EINTR), properly invokes the signal
3056 handler and retries if the latter returned successfully."""
3057 r, w = os.pipe()
3058 fdopen_kwargs["closefd"] = False
3059 def alarm_handler(sig, frame):
3060 os.write(w, b"bar")
3061 signal.signal(signal.SIGALRM, alarm_handler)
3062 try:
3063 rio = self.io.open(r, **fdopen_kwargs)
3064 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003065 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003066 # Expected behaviour:
3067 # - first raw read() returns partial b"foo"
3068 # - second raw read() returns EINTR
3069 # - third raw read() returns b"bar"
3070 self.assertEqual(decode(rio.read(6)), "foobar")
3071 finally:
3072 rio.close()
3073 os.close(w)
3074 os.close(r)
3075
Antoine Pitrou20db5112011-08-19 20:32:34 +02003076 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003077 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3078 mode="rb")
3079
Antoine Pitrou20db5112011-08-19 20:32:34 +02003080 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003081 self.check_interrupted_read_retry(lambda x: x,
3082 mode="r")
3083
3084 @unittest.skipUnless(threading, 'Threading required for this test.')
3085 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3086 """Check that a buffered write, when it gets interrupted (either
3087 returning a partial result or EINTR), properly invokes the signal
3088 handler and retries if the latter returned successfully."""
3089 select = support.import_module("select")
3090 # A quantity that exceeds the buffer size of an anonymous pipe's
3091 # write end.
3092 N = 1024 * 1024
3093 r, w = os.pipe()
3094 fdopen_kwargs["closefd"] = False
3095 # We need a separate thread to read from the pipe and allow the
3096 # write() to finish. This thread is started after the SIGALRM is
3097 # received (forcing a first EINTR in write()).
3098 read_results = []
3099 write_finished = False
3100 def _read():
3101 while not write_finished:
3102 while r in select.select([r], [], [], 1.0)[0]:
3103 s = os.read(r, 1024)
3104 read_results.append(s)
3105 t = threading.Thread(target=_read)
3106 t.daemon = True
3107 def alarm1(sig, frame):
3108 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003109 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003110 def alarm2(sig, frame):
3111 t.start()
3112 signal.signal(signal.SIGALRM, alarm1)
3113 try:
3114 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003115 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003116 # Expected behaviour:
3117 # - first raw write() is partial (because of the limited pipe buffer
3118 # and the first alarm)
3119 # - second raw write() returns EINTR (because of the second alarm)
3120 # - subsequent write()s are successful (either partial or complete)
3121 self.assertEqual(N, wio.write(item * N))
3122 wio.flush()
3123 write_finished = True
3124 t.join()
3125 self.assertEqual(N, sum(len(x) for x in read_results))
3126 finally:
3127 write_finished = True
3128 os.close(w)
3129 os.close(r)
3130 # This is deliberate. If we didn't close the file descriptor
3131 # before closing wio, wio would try to flush its internal
3132 # buffer, and could block (in case of failure).
3133 try:
3134 wio.close()
3135 except IOError as e:
3136 if e.errno != errno.EBADF:
3137 raise
3138
Antoine Pitrou20db5112011-08-19 20:32:34 +02003139 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003140 self.check_interrupted_write_retry(b"x", mode="wb")
3141
Antoine Pitrou20db5112011-08-19 20:32:34 +02003142 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003143 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3144
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003145
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003146class CSignalsTest(SignalsTest):
3147 io = io
3148
3149class PySignalsTest(SignalsTest):
3150 io = pyio
3151
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003152 # Handling reentrancy issues would slow down _pyio even more, so the
3153 # tests are disabled.
3154 test_reentrant_write_buffered = None
3155 test_reentrant_write_text = None
3156
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003157
Guido van Rossum28524c72007-02-27 05:47:44 +00003158def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003159 tests = (CIOTest, PyIOTest,
3160 CBufferedReaderTest, PyBufferedReaderTest,
3161 CBufferedWriterTest, PyBufferedWriterTest,
3162 CBufferedRWPairTest, PyBufferedRWPairTest,
3163 CBufferedRandomTest, PyBufferedRandomTest,
3164 StatefulIncrementalDecoderTest,
3165 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3166 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003167 CMiscIOTest, PyMiscIOTest,
3168 CSignalsTest, PySignalsTest,
3169 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003170
3171 # Put the namespaces of the IO module we are testing and some useful mock
3172 # classes in the __dict__ of each test.
3173 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003174 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003175 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3176 c_io_ns = {name : getattr(io, name) for name in all_members}
3177 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3178 globs = globals()
3179 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3180 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3181 # Avoid turning open into a bound method.
3182 py_io_ns["open"] = pyio.OpenWrapper
3183 for test in tests:
3184 if test.__name__.startswith("C"):
3185 for name, obj in c_io_ns.items():
3186 setattr(test, name, obj)
3187 elif test.__name__.startswith("Py"):
3188 for name, obj in py_io_ns.items():
3189 setattr(test, name, obj)
3190
3191 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00003192
3193if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003194 test_main()