blob: 555ad7444709acb450da31763ef9cf17c613d636 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000038
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000039import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000042try:
43 import threading
44except ImportError:
45 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010046try:
47 import fcntl
48except ImportError:
49 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000050
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000051def _default_chunk_size():
52 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000053 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000054 return f._CHUNK_SIZE
55
56
Antoine Pitrou328ec742010-09-14 18:37:24 +000057class MockRawIOWithoutRead:
58 """A RawIO implementation without read(), so as to exercise the default
59 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000060
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000061 def __init__(self, read_stack=()):
62 self._read_stack = list(read_stack)
63 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000065 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000066
Guido van Rossum01a27522007-03-07 01:00:12 +000067 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000068 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000069 return len(b)
70
71 def writable(self):
72 return True
73
Guido van Rossum68bbcd22007-02-27 17:19:33 +000074 def fileno(self):
75 return 42
76
77 def readable(self):
78 return True
79
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000081 return True
82
Guido van Rossum01a27522007-03-07 01:00:12 +000083 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000085
86 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000087 return 0 # same comment as above
88
89 def readinto(self, buf):
90 self._reads += 1
91 max_len = len(buf)
92 try:
93 data = self._read_stack[0]
94 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000095 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000096 return 0
97 if data is None:
98 del self._read_stack[0]
99 return None
100 n = len(data)
101 if len(data) <= max_len:
102 del self._read_stack[0]
103 buf[:n] = data
104 return n
105 else:
106 buf[:] = data[:max_len]
107 self._read_stack[0] = data[max_len:]
108 return max_len
109
110 def truncate(self, pos=None):
111 return pos
112
Antoine Pitrou328ec742010-09-14 18:37:24 +0000113class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
114 pass
115
116class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
117 pass
118
119
120class MockRawIO(MockRawIOWithoutRead):
121
122 def read(self, n=None):
123 self._reads += 1
124 try:
125 return self._read_stack.pop(0)
126 except:
127 self._extraneous_reads += 1
128 return b""
129
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000130class CMockRawIO(MockRawIO, io.RawIOBase):
131 pass
132
133class PyMockRawIO(MockRawIO, pyio.RawIOBase):
134 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000135
Guido van Rossuma9e20242007-03-08 00:43:48 +0000136
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000137class MisbehavedRawIO(MockRawIO):
138 def write(self, b):
139 return super().write(b) * 2
140
141 def read(self, n=None):
142 return super().read(n) * 2
143
144 def seek(self, pos, whence):
145 return -123
146
147 def tell(self):
148 return -456
149
150 def readinto(self, buf):
151 super().readinto(buf)
152 return len(buf) * 5
153
154class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
155 pass
156
157class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
158 pass
159
160
161class CloseFailureIO(MockRawIO):
162 closed = 0
163
164 def close(self):
165 if not self.closed:
166 self.closed = 1
167 raise IOError
168
169class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
170 pass
171
172class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
173 pass
174
175
176class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000177
178 def __init__(self, data):
179 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000181
182 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000183 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000184 self.read_history.append(None if res is None else len(res))
185 return res
186
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000187 def readinto(self, b):
188 res = super().readinto(b)
189 self.read_history.append(res)
190 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000192class CMockFileIO(MockFileIO, io.BytesIO):
193 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000194
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000195class PyMockFileIO(MockFileIO, pyio.BytesIO):
196 pass
197
198
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000199class MockUnseekableIO:
200 def seekable(self):
201 return False
202
203 def seek(self, *args):
204 raise self.UnsupportedOperation("not seekable")
205
206 def tell(self, *args):
207 raise self.UnsupportedOperation("not seekable")
208
209class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
210 UnsupportedOperation = io.UnsupportedOperation
211
212class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
213 UnsupportedOperation = pyio.UnsupportedOperation
214
215
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216class MockNonBlockWriterIO:
217
218 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000219 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000220 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000222 def pop_written(self):
223 s = b"".join(self._write_stack)
224 self._write_stack[:] = []
225 return s
226
227 def block_on(self, char):
228 """Block when a given char is encountered."""
229 self._blocker_char = char
230
231 def readable(self):
232 return True
233
234 def seekable(self):
235 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000236
Guido van Rossum01a27522007-03-07 01:00:12 +0000237 def writable(self):
238 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000239
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000240 def write(self, b):
241 b = bytes(b)
242 n = -1
243 if self._blocker_char:
244 try:
245 n = b.index(self._blocker_char)
246 except ValueError:
247 pass
248 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100249 if n > 0:
250 # write data up to the first blocker
251 self._write_stack.append(b[:n])
252 return n
253 else:
254 # cancel blocker and indicate would block
255 self._blocker_char = None
256 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000257 self._write_stack.append(b)
258 return len(b)
259
260class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
261 BlockingIOError = io.BlockingIOError
262
263class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
264 BlockingIOError = pyio.BlockingIOError
265
Guido van Rossuma9e20242007-03-08 00:43:48 +0000266
Guido van Rossum28524c72007-02-27 05:47:44 +0000267class IOTest(unittest.TestCase):
268
Neal Norwitze7789b12008-03-24 06:18:09 +0000269 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000270 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000271
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000272 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000273 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000274
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000276 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000277 f.truncate(0)
278 self.assertEqual(f.tell(), 5)
279 f.seek(0)
280
281 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000282 self.assertEqual(f.seek(0), 0)
283 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000284 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000285 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000286 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000287 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000288 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000289 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000290 self.assertEqual(f.seek(-1, 2), 13)
291 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000292
Guido van Rossum87429772007-04-10 21:06:59 +0000293 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000294 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000295 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000296
Guido van Rossum9b76da62007-04-11 01:09:03 +0000297 def read_ops(self, f, buffered=False):
298 data = f.read(5)
299 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000300 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000301 self.assertEqual(f.readinto(data), 5)
302 self.assertEqual(data, b" worl")
303 self.assertEqual(f.readinto(data), 2)
304 self.assertEqual(len(data), 5)
305 self.assertEqual(data[:2], b"d\n")
306 self.assertEqual(f.seek(0), 0)
307 self.assertEqual(f.read(20), b"hello world\n")
308 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000309 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000310 self.assertEqual(f.seek(-6, 2), 6)
311 self.assertEqual(f.read(5), b"world")
312 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000313 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000314 self.assertEqual(f.seek(-6, 1), 5)
315 self.assertEqual(f.read(5), b" worl")
316 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000317 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000318 if buffered:
319 f.seek(0)
320 self.assertEqual(f.read(), b"hello world\n")
321 f.seek(6)
322 self.assertEqual(f.read(), b"world\n")
323 self.assertEqual(f.read(), b"")
324
Guido van Rossum34d69e52007-04-10 20:08:41 +0000325 LARGE = 2**31
326
Guido van Rossum53807da2007-04-10 19:01:47 +0000327 def large_file_ops(self, f):
328 assert f.readable()
329 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000330 self.assertEqual(f.seek(self.LARGE), self.LARGE)
331 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000332 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000333 self.assertEqual(f.tell(), self.LARGE + 3)
334 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000336 self.assertEqual(f.tell(), self.LARGE + 2)
337 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000338 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000339 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000340 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
341 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000342 self.assertEqual(f.read(2), b"x")
343
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000344 def test_invalid_operations(self):
345 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000346 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000347 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000348 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000349 self.assertRaises(exc, fp.read)
350 self.assertRaises(exc, fp.readline)
351 with self.open(support.TESTFN, "wb", buffering=0) as fp:
352 self.assertRaises(exc, fp.read)
353 self.assertRaises(exc, fp.readline)
354 with self.open(support.TESTFN, "rb", buffering=0) as fp:
355 self.assertRaises(exc, fp.write, b"blah")
356 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000357 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000358 self.assertRaises(exc, fp.write, b"blah")
359 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000360 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000361 self.assertRaises(exc, fp.write, "blah")
362 self.assertRaises(exc, fp.writelines, ["blah\n"])
363 # Non-zero seeking from current or end pos
364 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
365 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000366
Antoine Pitrou13348842012-01-29 18:36:34 +0100367 def test_open_handles_NUL_chars(self):
368 fn_with_NUL = 'foo\0bar'
369 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
370 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
371
Guido van Rossum28524c72007-02-27 05:47:44 +0000372 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000373 with self.open(support.TESTFN, "wb", buffering=0) as f:
374 self.assertEqual(f.readable(), False)
375 self.assertEqual(f.writable(), True)
376 self.assertEqual(f.seekable(), True)
377 self.write_ops(f)
378 with self.open(support.TESTFN, "rb", buffering=0) as f:
379 self.assertEqual(f.readable(), True)
380 self.assertEqual(f.writable(), False)
381 self.assertEqual(f.seekable(), True)
382 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000383
Guido van Rossum87429772007-04-10 21:06:59 +0000384 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000385 with self.open(support.TESTFN, "wb") as f:
386 self.assertEqual(f.readable(), False)
387 self.assertEqual(f.writable(), True)
388 self.assertEqual(f.seekable(), True)
389 self.write_ops(f)
390 with self.open(support.TESTFN, "rb") as f:
391 self.assertEqual(f.readable(), True)
392 self.assertEqual(f.writable(), False)
393 self.assertEqual(f.seekable(), True)
394 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000395
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000396 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000397 with self.open(support.TESTFN, "wb") as f:
398 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
399 with self.open(support.TESTFN, "rb") as f:
400 self.assertEqual(f.readline(), b"abc\n")
401 self.assertEqual(f.readline(10), b"def\n")
402 self.assertEqual(f.readline(2), b"xy")
403 self.assertEqual(f.readline(4), b"zzy\n")
404 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000405 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000406 self.assertRaises(TypeError, f.readline, 5.3)
407 with self.open(support.TESTFN, "r") as f:
408 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000409
Guido van Rossum28524c72007-02-27 05:47:44 +0000410 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000411 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000412 self.write_ops(f)
413 data = f.getvalue()
414 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000415 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000416 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000417
Guido van Rossum53807da2007-04-10 19:01:47 +0000418 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000419 # On Windows and Mac OSX this test comsumes large resources; It takes
420 # a long time to build the >2GB file and takes >2GB of disk space
421 # therefore the resource must be enabled to run this test.
422 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000423 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000424 print("\nTesting large file ops skipped on %s." % sys.platform,
425 file=sys.stderr)
426 print("It requires %d bytes and a long time." % self.LARGE,
427 file=sys.stderr)
428 print("Use 'regrtest.py -u largefile test_io' to run it.",
429 file=sys.stderr)
430 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000431 with self.open(support.TESTFN, "w+b", 0) as f:
432 self.large_file_ops(f)
433 with self.open(support.TESTFN, "w+b") as f:
434 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000435
436 def test_with_open(self):
437 for bufsize in (0, 1, 100):
438 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000439 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000440 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000441 self.assertEqual(f.closed, True)
442 f = None
443 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000444 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000445 1/0
446 except ZeroDivisionError:
447 self.assertEqual(f.closed, True)
448 else:
449 self.fail("1/0 didn't raise an exception")
450
Antoine Pitrou08838b62009-01-21 00:55:13 +0000451 # issue 5008
452 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000454 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000455 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000456 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000457 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000458 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000459 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000460 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000461
Guido van Rossum87429772007-04-10 21:06:59 +0000462 def test_destructor(self):
463 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000464 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000465 def __del__(self):
466 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000467 try:
468 f = super().__del__
469 except AttributeError:
470 pass
471 else:
472 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000473 def close(self):
474 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000475 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000476 def flush(self):
477 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000478 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000479 with support.check_warnings(('', ResourceWarning)):
480 f = MyFileIO(support.TESTFN, "wb")
481 f.write(b"xxx")
482 del f
483 support.gc_collect()
484 self.assertEqual(record, [1, 2, 3])
485 with self.open(support.TESTFN, "rb") as f:
486 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000487
488 def _check_base_destructor(self, base):
489 record = []
490 class MyIO(base):
491 def __init__(self):
492 # This exercises the availability of attributes on object
493 # destruction.
494 # (in the C version, close() is called by the tp_dealloc
495 # function, not by __del__)
496 self.on_del = 1
497 self.on_close = 2
498 self.on_flush = 3
499 def __del__(self):
500 record.append(self.on_del)
501 try:
502 f = super().__del__
503 except AttributeError:
504 pass
505 else:
506 f()
507 def close(self):
508 record.append(self.on_close)
509 super().close()
510 def flush(self):
511 record.append(self.on_flush)
512 super().flush()
513 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000514 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000515 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000516 self.assertEqual(record, [1, 2, 3])
517
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000518 def test_IOBase_destructor(self):
519 self._check_base_destructor(self.IOBase)
520
521 def test_RawIOBase_destructor(self):
522 self._check_base_destructor(self.RawIOBase)
523
524 def test_BufferedIOBase_destructor(self):
525 self._check_base_destructor(self.BufferedIOBase)
526
527 def test_TextIOBase_destructor(self):
528 self._check_base_destructor(self.TextIOBase)
529
Guido van Rossum87429772007-04-10 21:06:59 +0000530 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000531 with self.open(support.TESTFN, "wb") as f:
532 f.write(b"xxx")
533 with self.open(support.TESTFN, "rb") as f:
534 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000535
Guido van Rossumd4103952007-04-12 05:44:49 +0000536 def test_array_writes(self):
537 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000538 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000539 with self.open(support.TESTFN, "wb", 0) as f:
540 self.assertEqual(f.write(a), n)
541 with self.open(support.TESTFN, "wb") as f:
542 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000543
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000544 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000545 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000546 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000547
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000548 def test_read_closed(self):
549 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000550 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 with self.open(support.TESTFN, "r") as f:
552 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000553 self.assertEqual(file.read(), "egg\n")
554 file.seek(0)
555 file.close()
556 self.assertRaises(ValueError, file.read)
557
558 def test_no_closefd_with_filename(self):
559 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000560 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000561
562 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000563 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000564 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000565 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000566 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000567 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000568 self.assertEqual(file.buffer.raw.closefd, False)
569
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000570 def test_garbage_collection(self):
571 # FileIO objects are collected, and collecting them flushes
572 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000573 with support.check_warnings(('', ResourceWarning)):
574 f = self.FileIO(support.TESTFN, "wb")
575 f.write(b"abcxxx")
576 f.f = f
577 wr = weakref.ref(f)
578 del f
579 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000580 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000581 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000582 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000583
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000584 def test_unbounded_file(self):
585 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
586 zero = "/dev/zero"
587 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000588 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000589 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000590 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000591 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000592 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000593 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000594 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000595 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000596 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000597 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000598 self.assertRaises(OverflowError, f.read)
599
Antoine Pitrou6be88762010-05-03 16:48:20 +0000600 def test_flush_error_on_close(self):
601 f = self.open(support.TESTFN, "wb", buffering=0)
602 def bad_flush():
603 raise IOError()
604 f.flush = bad_flush
605 self.assertRaises(IOError, f.close) # exception not swallowed
606
607 def test_multi_close(self):
608 f = self.open(support.TESTFN, "wb", buffering=0)
609 f.close()
610 f.close()
611 f.close()
612 self.assertRaises(ValueError, f.flush)
613
Antoine Pitrou328ec742010-09-14 18:37:24 +0000614 def test_RawIOBase_read(self):
615 # Exercise the default RawIOBase.read() implementation (which calls
616 # readinto() internally).
617 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
618 self.assertEqual(rawio.read(2), b"ab")
619 self.assertEqual(rawio.read(2), b"c")
620 self.assertEqual(rawio.read(2), b"d")
621 self.assertEqual(rawio.read(2), None)
622 self.assertEqual(rawio.read(2), b"ef")
623 self.assertEqual(rawio.read(2), b"g")
624 self.assertEqual(rawio.read(2), None)
625 self.assertEqual(rawio.read(2), b"")
626
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400627 def test_types_have_dict(self):
628 test = (
629 self.IOBase(),
630 self.RawIOBase(),
631 self.TextIOBase(),
632 self.StringIO(),
633 self.BytesIO()
634 )
635 for obj in test:
636 self.assertTrue(hasattr(obj, "__dict__"))
637
Ross Lagerwall59142db2011-10-31 20:34:46 +0200638 def test_opener(self):
639 with self.open(support.TESTFN, "w") as f:
640 f.write("egg\n")
641 fd = os.open(support.TESTFN, os.O_RDONLY)
642 def opener(path, flags):
643 return fd
644 with self.open("non-existent", "r", opener=opener) as f:
645 self.assertEqual(f.read(), "egg\n")
646
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200647 def test_fileio_closefd(self):
648 # Issue #4841
649 with self.open(__file__, 'rb') as f1, \
650 self.open(__file__, 'rb') as f2:
651 fileio = self.FileIO(f1.fileno(), closefd=False)
652 # .__init__() must not close f1
653 fileio.__init__(f2.fileno(), closefd=False)
654 f1.readline()
655 # .close() must not close f2
656 fileio.close()
657 f2.readline()
658
659
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000660class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200661
662 def test_IOBase_finalize(self):
663 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
664 # class which inherits IOBase and an object of this class are caught
665 # in a reference cycle and close() is already in the method cache.
666 class MyIO(self.IOBase):
667 def close(self):
668 pass
669
670 # create an instance to populate the method cache
671 MyIO()
672 obj = MyIO()
673 obj.obj = obj
674 wr = weakref.ref(obj)
675 del MyIO
676 del obj
677 support.gc_collect()
678 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000679
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000680class PyIOTest(IOTest):
681 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000682
Guido van Rossuma9e20242007-03-08 00:43:48 +0000683
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000684class CommonBufferedTests:
685 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
686
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000687 def test_detach(self):
688 raw = self.MockRawIO()
689 buf = self.tp(raw)
690 self.assertIs(buf.detach(), raw)
691 self.assertRaises(ValueError, buf.detach)
692
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000693 def test_fileno(self):
694 rawio = self.MockRawIO()
695 bufio = self.tp(rawio)
696
Ezio Melottib3aedd42010-11-20 19:04:17 +0000697 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000698
699 def test_no_fileno(self):
700 # XXX will we always have fileno() function? If so, kill
701 # this test. Else, write it.
702 pass
703
704 def test_invalid_args(self):
705 rawio = self.MockRawIO()
706 bufio = self.tp(rawio)
707 # Invalid whence
708 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200709 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000710
711 def test_override_destructor(self):
712 tp = self.tp
713 record = []
714 class MyBufferedIO(tp):
715 def __del__(self):
716 record.append(1)
717 try:
718 f = super().__del__
719 except AttributeError:
720 pass
721 else:
722 f()
723 def close(self):
724 record.append(2)
725 super().close()
726 def flush(self):
727 record.append(3)
728 super().flush()
729 rawio = self.MockRawIO()
730 bufio = MyBufferedIO(rawio)
731 writable = bufio.writable()
732 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000733 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000734 if writable:
735 self.assertEqual(record, [1, 2, 3])
736 else:
737 self.assertEqual(record, [1, 2])
738
739 def test_context_manager(self):
740 # Test usability as a context manager
741 rawio = self.MockRawIO()
742 bufio = self.tp(rawio)
743 def _with():
744 with bufio:
745 pass
746 _with()
747 # bufio should now be closed, and using it a second time should raise
748 # a ValueError.
749 self.assertRaises(ValueError, _with)
750
751 def test_error_through_destructor(self):
752 # Test that the exception state is not modified by a destructor,
753 # even if close() fails.
754 rawio = self.CloseFailureIO()
755 def f():
756 self.tp(rawio).xyzzy
757 with support.captured_output("stderr") as s:
758 self.assertRaises(AttributeError, f)
759 s = s.getvalue().strip()
760 if s:
761 # The destructor *may* have printed an unraisable error, check it
762 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000763 self.assertTrue(s.startswith("Exception IOError: "), s)
764 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000765
Antoine Pitrou716c4442009-05-23 19:04:03 +0000766 def test_repr(self):
767 raw = self.MockRawIO()
768 b = self.tp(raw)
769 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
770 self.assertEqual(repr(b), "<%s>" % clsname)
771 raw.name = "dummy"
772 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
773 raw.name = b"dummy"
774 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
775
Antoine Pitrou6be88762010-05-03 16:48:20 +0000776 def test_flush_error_on_close(self):
777 raw = self.MockRawIO()
778 def bad_flush():
779 raise IOError()
780 raw.flush = bad_flush
781 b = self.tp(raw)
782 self.assertRaises(IOError, b.close) # exception not swallowed
783
784 def test_multi_close(self):
785 raw = self.MockRawIO()
786 b = self.tp(raw)
787 b.close()
788 b.close()
789 b.close()
790 self.assertRaises(ValueError, b.flush)
791
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000792 def test_unseekable(self):
793 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
794 self.assertRaises(self.UnsupportedOperation, bufio.tell)
795 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
796
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000797 def test_readonly_attributes(self):
798 raw = self.MockRawIO()
799 buf = self.tp(raw)
800 x = self.MockRawIO()
801 with self.assertRaises(AttributeError):
802 buf.raw = x
803
Guido van Rossum78892e42007-04-06 17:31:18 +0000804
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200805class SizeofTest:
806
807 @support.cpython_only
808 def test_sizeof(self):
809 bufsize1 = 4096
810 bufsize2 = 8192
811 rawio = self.MockRawIO()
812 bufio = self.tp(rawio, buffer_size=bufsize1)
813 size = sys.getsizeof(bufio) - bufsize1
814 rawio = self.MockRawIO()
815 bufio = self.tp(rawio, buffer_size=bufsize2)
816 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
817
818
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000819class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
820 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000821
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000822 def test_constructor(self):
823 rawio = self.MockRawIO([b"abc"])
824 bufio = self.tp(rawio)
825 bufio.__init__(rawio)
826 bufio.__init__(rawio, buffer_size=1024)
827 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000828 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000829 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
830 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
831 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
832 rawio = self.MockRawIO([b"abc"])
833 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000834 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000835
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000836 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000837 for arg in (None, 7):
838 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
839 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000840 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000841 # Invalid args
842 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000843
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000844 def test_read1(self):
845 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
846 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000847 self.assertEqual(b"a", bufio.read(1))
848 self.assertEqual(b"b", bufio.read1(1))
849 self.assertEqual(rawio._reads, 1)
850 self.assertEqual(b"c", bufio.read1(100))
851 self.assertEqual(rawio._reads, 1)
852 self.assertEqual(b"d", bufio.read1(100))
853 self.assertEqual(rawio._reads, 2)
854 self.assertEqual(b"efg", bufio.read1(100))
855 self.assertEqual(rawio._reads, 3)
856 self.assertEqual(b"", bufio.read1(100))
857 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000858 # Invalid args
859 self.assertRaises(ValueError, bufio.read1, -1)
860
861 def test_readinto(self):
862 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
863 bufio = self.tp(rawio)
864 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000865 self.assertEqual(bufio.readinto(b), 2)
866 self.assertEqual(b, b"ab")
867 self.assertEqual(bufio.readinto(b), 2)
868 self.assertEqual(b, b"cd")
869 self.assertEqual(bufio.readinto(b), 2)
870 self.assertEqual(b, b"ef")
871 self.assertEqual(bufio.readinto(b), 1)
872 self.assertEqual(b, b"gf")
873 self.assertEqual(bufio.readinto(b), 0)
874 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200875 rawio = self.MockRawIO((b"abc", None))
876 bufio = self.tp(rawio)
877 self.assertEqual(bufio.readinto(b), 2)
878 self.assertEqual(b, b"ab")
879 self.assertEqual(bufio.readinto(b), 1)
880 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000881
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000882 def test_readlines(self):
883 def bufio():
884 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
885 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000886 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
887 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
888 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000889
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000890 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000891 data = b"abcdefghi"
892 dlen = len(data)
893
894 tests = [
895 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
896 [ 100, [ 3, 3, 3], [ dlen ] ],
897 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
898 ]
899
900 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000901 rawio = self.MockFileIO(data)
902 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000903 pos = 0
904 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000905 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000906 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000907 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000908 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000909
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000910 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000911 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000912 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
913 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000914 self.assertEqual(b"abcd", bufio.read(6))
915 self.assertEqual(b"e", bufio.read(1))
916 self.assertEqual(b"fg", bufio.read())
917 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200918 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000919 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000920
Victor Stinnera80987f2011-05-25 22:47:16 +0200921 rawio = self.MockRawIO((b"a", None, None))
922 self.assertEqual(b"a", rawio.readall())
923 self.assertIsNone(rawio.readall())
924
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000925 def test_read_past_eof(self):
926 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
927 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000928
Ezio Melottib3aedd42010-11-20 19:04:17 +0000929 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000930
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000931 def test_read_all(self):
932 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
933 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000934
Ezio Melottib3aedd42010-11-20 19:04:17 +0000935 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000936
Victor Stinner45df8202010-04-28 22:31:17 +0000937 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000938 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000939 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000940 try:
941 # Write out many bytes with exactly the same number of 0's,
942 # 1's... 255's. This will help us check that concurrent reading
943 # doesn't duplicate or forget contents.
944 N = 1000
945 l = list(range(256)) * N
946 random.shuffle(l)
947 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000948 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000949 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000950 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000951 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000952 errors = []
953 results = []
954 def f():
955 try:
956 # Intra-buffer read then buffer-flushing read
957 for n in cycle([1, 19]):
958 s = bufio.read(n)
959 if not s:
960 break
961 # list.append() is atomic
962 results.append(s)
963 except Exception as e:
964 errors.append(e)
965 raise
966 threads = [threading.Thread(target=f) for x in range(20)]
967 for t in threads:
968 t.start()
969 time.sleep(0.02) # yield
970 for t in threads:
971 t.join()
972 self.assertFalse(errors,
973 "the following exceptions were caught: %r" % errors)
974 s = b''.join(results)
975 for i in range(256):
976 c = bytes(bytearray([i]))
977 self.assertEqual(s.count(c), N)
978 finally:
979 support.unlink(support.TESTFN)
980
Antoine Pitrou1e44fec2011-10-04 12:26:20 +0200981 def test_unseekable(self):
982 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
983 self.assertRaises(self.UnsupportedOperation, bufio.tell)
984 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
985 bufio.read(1)
986 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
987 self.assertRaises(self.UnsupportedOperation, bufio.tell)
988
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000989 def test_misbehaved_io(self):
990 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
991 bufio = self.tp(rawio)
992 self.assertRaises(IOError, bufio.seek, 0)
993 self.assertRaises(IOError, bufio.tell)
994
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000995 def test_no_extraneous_read(self):
996 # Issue #9550; when the raw IO object has satisfied the read request,
997 # we should not issue any additional reads, otherwise it may block
998 # (e.g. socket).
999 bufsize = 16
1000 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1001 rawio = self.MockRawIO([b"x" * n])
1002 bufio = self.tp(rawio, bufsize)
1003 self.assertEqual(bufio.read(n), b"x" * n)
1004 # Simple case: one raw read is enough to satisfy the request.
1005 self.assertEqual(rawio._extraneous_reads, 0,
1006 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1007 # A more complex case where two raw reads are needed to satisfy
1008 # the request.
1009 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1010 bufio = self.tp(rawio, bufsize)
1011 self.assertEqual(bufio.read(n), b"x" * n)
1012 self.assertEqual(rawio._extraneous_reads, 0,
1013 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1014
1015
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001016class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001017 tp = io.BufferedReader
1018
1019 def test_constructor(self):
1020 BufferedReaderTest.test_constructor(self)
1021 # The allocation can succeed on 32-bit builds, e.g. with more
1022 # than 2GB RAM and a 64-bit kernel.
1023 if sys.maxsize > 0x7FFFFFFF:
1024 rawio = self.MockRawIO()
1025 bufio = self.tp(rawio)
1026 self.assertRaises((OverflowError, MemoryError, ValueError),
1027 bufio.__init__, rawio, sys.maxsize)
1028
1029 def test_initialization(self):
1030 rawio = self.MockRawIO([b"abc"])
1031 bufio = self.tp(rawio)
1032 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1033 self.assertRaises(ValueError, bufio.read)
1034 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1035 self.assertRaises(ValueError, bufio.read)
1036 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1037 self.assertRaises(ValueError, bufio.read)
1038
1039 def test_misbehaved_io_read(self):
1040 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1041 bufio = self.tp(rawio)
1042 # _pyio.BufferedReader seems to implement reading different, so that
1043 # checking this is not so easy.
1044 self.assertRaises(IOError, bufio.read, 10)
1045
1046 def test_garbage_collection(self):
1047 # C BufferedReader objects are collected.
1048 # The Python version has __del__, so it ends into gc.garbage instead
1049 rawio = self.FileIO(support.TESTFN, "w+b")
1050 f = self.tp(rawio)
1051 f.f = f
1052 wr = weakref.ref(f)
1053 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001054 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001055 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001056
1057class PyBufferedReaderTest(BufferedReaderTest):
1058 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001059
Guido van Rossuma9e20242007-03-08 00:43:48 +00001060
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001061class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1062 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001063
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001064 def test_constructor(self):
1065 rawio = self.MockRawIO()
1066 bufio = self.tp(rawio)
1067 bufio.__init__(rawio)
1068 bufio.__init__(rawio, buffer_size=1024)
1069 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001070 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001071 bufio.flush()
1072 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1073 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1074 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1075 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001076 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001077 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001078 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001079
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001080 def test_detach_flush(self):
1081 raw = self.MockRawIO()
1082 buf = self.tp(raw)
1083 buf.write(b"howdy!")
1084 self.assertFalse(raw._write_stack)
1085 buf.detach()
1086 self.assertEqual(raw._write_stack, [b"howdy!"])
1087
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001088 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001089 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001090 writer = self.MockRawIO()
1091 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001092 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001093 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001094
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001095 def test_write_overflow(self):
1096 writer = self.MockRawIO()
1097 bufio = self.tp(writer, 8)
1098 contents = b"abcdefghijklmnop"
1099 for n in range(0, len(contents), 3):
1100 bufio.write(contents[n:n+3])
1101 flushed = b"".join(writer._write_stack)
1102 # At least (total - 8) bytes were implicitly flushed, perhaps more
1103 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001104 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001105
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001106 def check_writes(self, intermediate_func):
1107 # Lots of writes, test the flushed output is as expected.
1108 contents = bytes(range(256)) * 1000
1109 n = 0
1110 writer = self.MockRawIO()
1111 bufio = self.tp(writer, 13)
1112 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1113 def gen_sizes():
1114 for size in count(1):
1115 for i in range(15):
1116 yield size
1117 sizes = gen_sizes()
1118 while n < len(contents):
1119 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001120 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001121 intermediate_func(bufio)
1122 n += size
1123 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001124 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001125
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001126 def test_writes(self):
1127 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001128
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001129 def test_writes_and_flushes(self):
1130 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001131
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001132 def test_writes_and_seeks(self):
1133 def _seekabs(bufio):
1134 pos = bufio.tell()
1135 bufio.seek(pos + 1, 0)
1136 bufio.seek(pos - 1, 0)
1137 bufio.seek(pos, 0)
1138 self.check_writes(_seekabs)
1139 def _seekrel(bufio):
1140 pos = bufio.seek(0, 1)
1141 bufio.seek(+1, 1)
1142 bufio.seek(-1, 1)
1143 bufio.seek(pos, 0)
1144 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001145
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001146 def test_writes_and_truncates(self):
1147 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001148
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001149 def test_write_non_blocking(self):
1150 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001151 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001152
Ezio Melottib3aedd42010-11-20 19:04:17 +00001153 self.assertEqual(bufio.write(b"abcd"), 4)
1154 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001155 # 1 byte will be written, the rest will be buffered
1156 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001157 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001158
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001159 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1160 raw.block_on(b"0")
1161 try:
1162 bufio.write(b"opqrwxyz0123456789")
1163 except self.BlockingIOError as e:
1164 written = e.characters_written
1165 else:
1166 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001167 self.assertEqual(written, 16)
1168 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001169 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001170
Ezio Melottib3aedd42010-11-20 19:04:17 +00001171 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001172 s = raw.pop_written()
1173 # Previously buffered bytes were flushed
1174 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001175
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001176 def test_write_and_rewind(self):
1177 raw = io.BytesIO()
1178 bufio = self.tp(raw, 4)
1179 self.assertEqual(bufio.write(b"abcdef"), 6)
1180 self.assertEqual(bufio.tell(), 6)
1181 bufio.seek(0, 0)
1182 self.assertEqual(bufio.write(b"XY"), 2)
1183 bufio.seek(6, 0)
1184 self.assertEqual(raw.getvalue(), b"XYcdef")
1185 self.assertEqual(bufio.write(b"123456"), 6)
1186 bufio.flush()
1187 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001189 def test_flush(self):
1190 writer = self.MockRawIO()
1191 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001192 bufio.write(b"abc")
1193 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001194 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001195
Antoine Pitrou131a4892012-10-16 22:57:11 +02001196 def test_writelines(self):
1197 l = [b'ab', b'cd', b'ef']
1198 writer = self.MockRawIO()
1199 bufio = self.tp(writer, 8)
1200 bufio.writelines(l)
1201 bufio.flush()
1202 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1203
1204 def test_writelines_userlist(self):
1205 l = UserList([b'ab', b'cd', b'ef'])
1206 writer = self.MockRawIO()
1207 bufio = self.tp(writer, 8)
1208 bufio.writelines(l)
1209 bufio.flush()
1210 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1211
1212 def test_writelines_error(self):
1213 writer = self.MockRawIO()
1214 bufio = self.tp(writer, 8)
1215 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1216 self.assertRaises(TypeError, bufio.writelines, None)
1217 self.assertRaises(TypeError, bufio.writelines, 'abc')
1218
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001219 def test_destructor(self):
1220 writer = self.MockRawIO()
1221 bufio = self.tp(writer, 8)
1222 bufio.write(b"abc")
1223 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001224 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001225 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001226
1227 def test_truncate(self):
1228 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001229 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001230 bufio = self.tp(raw, 8)
1231 bufio.write(b"abcdef")
1232 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001233 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001234 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001235 self.assertEqual(f.read(), b"abc")
1236
Victor Stinner45df8202010-04-28 22:31:17 +00001237 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001238 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001239 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001240 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001241 # Write out many bytes from many threads and test they were
1242 # all flushed.
1243 N = 1000
1244 contents = bytes(range(256)) * N
1245 sizes = cycle([1, 19])
1246 n = 0
1247 queue = deque()
1248 while n < len(contents):
1249 size = next(sizes)
1250 queue.append(contents[n:n+size])
1251 n += size
1252 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001253 # We use a real file object because it allows us to
1254 # exercise situations where the GIL is released before
1255 # writing the buffer to the raw streams. This is in addition
1256 # to concurrency issues due to switching threads in the middle
1257 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001258 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001259 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001260 errors = []
1261 def f():
1262 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001263 while True:
1264 try:
1265 s = queue.popleft()
1266 except IndexError:
1267 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001268 bufio.write(s)
1269 except Exception as e:
1270 errors.append(e)
1271 raise
1272 threads = [threading.Thread(target=f) for x in range(20)]
1273 for t in threads:
1274 t.start()
1275 time.sleep(0.02) # yield
1276 for t in threads:
1277 t.join()
1278 self.assertFalse(errors,
1279 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001280 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001281 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001282 s = f.read()
1283 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001284 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001285 finally:
1286 support.unlink(support.TESTFN)
1287
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001288 def test_misbehaved_io(self):
1289 rawio = self.MisbehavedRawIO()
1290 bufio = self.tp(rawio, 5)
1291 self.assertRaises(IOError, bufio.seek, 0)
1292 self.assertRaises(IOError, bufio.tell)
1293 self.assertRaises(IOError, bufio.write, b"abcdef")
1294
Florent Xicluna109d5732012-07-07 17:03:22 +02001295 def test_max_buffer_size_removal(self):
1296 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001297 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001298
1299
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001300class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001301 tp = io.BufferedWriter
1302
1303 def test_constructor(self):
1304 BufferedWriterTest.test_constructor(self)
1305 # The allocation can succeed on 32-bit builds, e.g. with more
1306 # than 2GB RAM and a 64-bit kernel.
1307 if sys.maxsize > 0x7FFFFFFF:
1308 rawio = self.MockRawIO()
1309 bufio = self.tp(rawio)
1310 self.assertRaises((OverflowError, MemoryError, ValueError),
1311 bufio.__init__, rawio, sys.maxsize)
1312
1313 def test_initialization(self):
1314 rawio = self.MockRawIO()
1315 bufio = self.tp(rawio)
1316 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1317 self.assertRaises(ValueError, bufio.write, b"def")
1318 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1319 self.assertRaises(ValueError, bufio.write, b"def")
1320 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1321 self.assertRaises(ValueError, bufio.write, b"def")
1322
1323 def test_garbage_collection(self):
1324 # C BufferedWriter objects are collected, and collecting them flushes
1325 # all data to disk.
1326 # The Python version has __del__, so it ends into gc.garbage instead
1327 rawio = self.FileIO(support.TESTFN, "w+b")
1328 f = self.tp(rawio)
1329 f.write(b"123xxx")
1330 f.x = f
1331 wr = weakref.ref(f)
1332 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001333 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001334 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001335 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001336 self.assertEqual(f.read(), b"123xxx")
1337
1338
1339class PyBufferedWriterTest(BufferedWriterTest):
1340 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001341
Guido van Rossum01a27522007-03-07 01:00:12 +00001342class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001343
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001344 def test_constructor(self):
1345 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001346 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001347
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001348 def test_detach(self):
1349 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1350 self.assertRaises(self.UnsupportedOperation, pair.detach)
1351
Florent Xicluna109d5732012-07-07 17:03:22 +02001352 def test_constructor_max_buffer_size_removal(self):
1353 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001354 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001355
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001356 def test_constructor_with_not_readable(self):
1357 class NotReadable(MockRawIO):
1358 def readable(self):
1359 return False
1360
1361 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1362
1363 def test_constructor_with_not_writeable(self):
1364 class NotWriteable(MockRawIO):
1365 def writable(self):
1366 return False
1367
1368 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1369
1370 def test_read(self):
1371 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1372
1373 self.assertEqual(pair.read(3), b"abc")
1374 self.assertEqual(pair.read(1), b"d")
1375 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001376 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1377 self.assertEqual(pair.read(None), b"abc")
1378
1379 def test_readlines(self):
1380 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1381 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1382 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1383 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001384
1385 def test_read1(self):
1386 # .read1() is delegated to the underlying reader object, so this test
1387 # can be shallow.
1388 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1389
1390 self.assertEqual(pair.read1(3), b"abc")
1391
1392 def test_readinto(self):
1393 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1394
1395 data = bytearray(5)
1396 self.assertEqual(pair.readinto(data), 5)
1397 self.assertEqual(data, b"abcde")
1398
1399 def test_write(self):
1400 w = self.MockRawIO()
1401 pair = self.tp(self.MockRawIO(), w)
1402
1403 pair.write(b"abc")
1404 pair.flush()
1405 pair.write(b"def")
1406 pair.flush()
1407 self.assertEqual(w._write_stack, [b"abc", b"def"])
1408
1409 def test_peek(self):
1410 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1411
1412 self.assertTrue(pair.peek(3).startswith(b"abc"))
1413 self.assertEqual(pair.read(3), b"abc")
1414
1415 def test_readable(self):
1416 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1417 self.assertTrue(pair.readable())
1418
1419 def test_writeable(self):
1420 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1421 self.assertTrue(pair.writable())
1422
1423 def test_seekable(self):
1424 # BufferedRWPairs are never seekable, even if their readers and writers
1425 # are.
1426 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1427 self.assertFalse(pair.seekable())
1428
1429 # .flush() is delegated to the underlying writer object and has been
1430 # tested in the test_write method.
1431
1432 def test_close_and_closed(self):
1433 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1434 self.assertFalse(pair.closed)
1435 pair.close()
1436 self.assertTrue(pair.closed)
1437
1438 def test_isatty(self):
1439 class SelectableIsAtty(MockRawIO):
1440 def __init__(self, isatty):
1441 MockRawIO.__init__(self)
1442 self._isatty = isatty
1443
1444 def isatty(self):
1445 return self._isatty
1446
1447 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1448 self.assertFalse(pair.isatty())
1449
1450 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1451 self.assertTrue(pair.isatty())
1452
1453 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1454 self.assertTrue(pair.isatty())
1455
1456 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1457 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001458
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001459class CBufferedRWPairTest(BufferedRWPairTest):
1460 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001461
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001462class PyBufferedRWPairTest(BufferedRWPairTest):
1463 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001464
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001465
1466class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1467 read_mode = "rb+"
1468 write_mode = "wb+"
1469
1470 def test_constructor(self):
1471 BufferedReaderTest.test_constructor(self)
1472 BufferedWriterTest.test_constructor(self)
1473
1474 def test_read_and_write(self):
1475 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001476 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001477
1478 self.assertEqual(b"as", rw.read(2))
1479 rw.write(b"ddd")
1480 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001481 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001482 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001483 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001484
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001485 def test_seek_and_tell(self):
1486 raw = self.BytesIO(b"asdfghjkl")
1487 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001488
Ezio Melottib3aedd42010-11-20 19:04:17 +00001489 self.assertEqual(b"as", rw.read(2))
1490 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001491 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001492 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001493
Antoine Pitroue05565e2011-08-20 14:39:23 +02001494 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001495 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001496 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001497 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001498 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001499 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001500 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001501 self.assertEqual(7, rw.tell())
1502 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001503 rw.flush()
1504 self.assertEqual(b"asdf123fl", raw.getvalue())
1505
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001506 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001507
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001508 def check_flush_and_read(self, read_func):
1509 raw = self.BytesIO(b"abcdefghi")
1510 bufio = self.tp(raw)
1511
Ezio Melottib3aedd42010-11-20 19:04:17 +00001512 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001513 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001514 self.assertEqual(b"ef", read_func(bufio, 2))
1515 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001516 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001517 self.assertEqual(6, bufio.tell())
1518 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001519 raw.seek(0, 0)
1520 raw.write(b"XYZ")
1521 # flush() resets the read buffer
1522 bufio.flush()
1523 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001524 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001525
1526 def test_flush_and_read(self):
1527 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1528
1529 def test_flush_and_readinto(self):
1530 def _readinto(bufio, n=-1):
1531 b = bytearray(n if n >= 0 else 9999)
1532 n = bufio.readinto(b)
1533 return bytes(b[:n])
1534 self.check_flush_and_read(_readinto)
1535
1536 def test_flush_and_peek(self):
1537 def _peek(bufio, n=-1):
1538 # This relies on the fact that the buffer can contain the whole
1539 # raw stream, otherwise peek() can return less.
1540 b = bufio.peek(n)
1541 if n != -1:
1542 b = b[:n]
1543 bufio.seek(len(b), 1)
1544 return b
1545 self.check_flush_and_read(_peek)
1546
1547 def test_flush_and_write(self):
1548 raw = self.BytesIO(b"abcdefghi")
1549 bufio = self.tp(raw)
1550
1551 bufio.write(b"123")
1552 bufio.flush()
1553 bufio.write(b"45")
1554 bufio.flush()
1555 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001556 self.assertEqual(b"12345fghi", raw.getvalue())
1557 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001558
1559 def test_threads(self):
1560 BufferedReaderTest.test_threads(self)
1561 BufferedWriterTest.test_threads(self)
1562
1563 def test_writes_and_peek(self):
1564 def _peek(bufio):
1565 bufio.peek(1)
1566 self.check_writes(_peek)
1567 def _peek(bufio):
1568 pos = bufio.tell()
1569 bufio.seek(-1, 1)
1570 bufio.peek(1)
1571 bufio.seek(pos, 0)
1572 self.check_writes(_peek)
1573
1574 def test_writes_and_reads(self):
1575 def _read(bufio):
1576 bufio.seek(-1, 1)
1577 bufio.read(1)
1578 self.check_writes(_read)
1579
1580 def test_writes_and_read1s(self):
1581 def _read1(bufio):
1582 bufio.seek(-1, 1)
1583 bufio.read1(1)
1584 self.check_writes(_read1)
1585
1586 def test_writes_and_readintos(self):
1587 def _read(bufio):
1588 bufio.seek(-1, 1)
1589 bufio.readinto(bytearray(1))
1590 self.check_writes(_read)
1591
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001592 def test_write_after_readahead(self):
1593 # Issue #6629: writing after the buffer was filled by readahead should
1594 # first rewind the raw stream.
1595 for overwrite_size in [1, 5]:
1596 raw = self.BytesIO(b"A" * 10)
1597 bufio = self.tp(raw, 4)
1598 # Trigger readahead
1599 self.assertEqual(bufio.read(1), b"A")
1600 self.assertEqual(bufio.tell(), 1)
1601 # Overwriting should rewind the raw stream if it needs so
1602 bufio.write(b"B" * overwrite_size)
1603 self.assertEqual(bufio.tell(), overwrite_size + 1)
1604 # If the write size was smaller than the buffer size, flush() and
1605 # check that rewind happens.
1606 bufio.flush()
1607 self.assertEqual(bufio.tell(), overwrite_size + 1)
1608 s = raw.getvalue()
1609 self.assertEqual(s,
1610 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1611
Antoine Pitrou7c404892011-05-13 00:13:33 +02001612 def test_write_rewind_write(self):
1613 # Various combinations of reading / writing / seeking backwards / writing again
1614 def mutate(bufio, pos1, pos2):
1615 assert pos2 >= pos1
1616 # Fill the buffer
1617 bufio.seek(pos1)
1618 bufio.read(pos2 - pos1)
1619 bufio.write(b'\x02')
1620 # This writes earlier than the previous write, but still inside
1621 # the buffer.
1622 bufio.seek(pos1)
1623 bufio.write(b'\x01')
1624
1625 b = b"\x80\x81\x82\x83\x84"
1626 for i in range(0, len(b)):
1627 for j in range(i, len(b)):
1628 raw = self.BytesIO(b)
1629 bufio = self.tp(raw, 100)
1630 mutate(bufio, i, j)
1631 bufio.flush()
1632 expected = bytearray(b)
1633 expected[j] = 2
1634 expected[i] = 1
1635 self.assertEqual(raw.getvalue(), expected,
1636 "failed result for i=%d, j=%d" % (i, j))
1637
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001638 def test_truncate_after_read_or_write(self):
1639 raw = self.BytesIO(b"A" * 10)
1640 bufio = self.tp(raw, 100)
1641 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1642 self.assertEqual(bufio.truncate(), 2)
1643 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1644 self.assertEqual(bufio.truncate(), 4)
1645
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001646 def test_misbehaved_io(self):
1647 BufferedReaderTest.test_misbehaved_io(self)
1648 BufferedWriterTest.test_misbehaved_io(self)
1649
Antoine Pitroue05565e2011-08-20 14:39:23 +02001650 def test_interleaved_read_write(self):
1651 # Test for issue #12213
1652 with self.BytesIO(b'abcdefgh') as raw:
1653 with self.tp(raw, 100) as f:
1654 f.write(b"1")
1655 self.assertEqual(f.read(1), b'b')
1656 f.write(b'2')
1657 self.assertEqual(f.read1(1), b'd')
1658 f.write(b'3')
1659 buf = bytearray(1)
1660 f.readinto(buf)
1661 self.assertEqual(buf, b'f')
1662 f.write(b'4')
1663 self.assertEqual(f.peek(1), b'h')
1664 f.flush()
1665 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1666
1667 with self.BytesIO(b'abc') as raw:
1668 with self.tp(raw, 100) as f:
1669 self.assertEqual(f.read(1), b'a')
1670 f.write(b"2")
1671 self.assertEqual(f.read(1), b'c')
1672 f.flush()
1673 self.assertEqual(raw.getvalue(), b'a2c')
1674
1675 def test_interleaved_readline_write(self):
1676 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1677 with self.tp(raw) as f:
1678 f.write(b'1')
1679 self.assertEqual(f.readline(), b'b\n')
1680 f.write(b'2')
1681 self.assertEqual(f.readline(), b'def\n')
1682 f.write(b'3')
1683 self.assertEqual(f.readline(), b'\n')
1684 f.flush()
1685 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1686
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001687 # You can't construct a BufferedRandom over a non-seekable stream.
1688 test_unseekable = None
1689
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001690class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001691 tp = io.BufferedRandom
1692
1693 def test_constructor(self):
1694 BufferedRandomTest.test_constructor(self)
1695 # The allocation can succeed on 32-bit builds, e.g. with more
1696 # than 2GB RAM and a 64-bit kernel.
1697 if sys.maxsize > 0x7FFFFFFF:
1698 rawio = self.MockRawIO()
1699 bufio = self.tp(rawio)
1700 self.assertRaises((OverflowError, MemoryError, ValueError),
1701 bufio.__init__, rawio, sys.maxsize)
1702
1703 def test_garbage_collection(self):
1704 CBufferedReaderTest.test_garbage_collection(self)
1705 CBufferedWriterTest.test_garbage_collection(self)
1706
1707class PyBufferedRandomTest(BufferedRandomTest):
1708 tp = pyio.BufferedRandom
1709
1710
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001711# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1712# properties:
1713# - A single output character can correspond to many bytes of input.
1714# - The number of input bytes to complete the character can be
1715# undetermined until the last input byte is received.
1716# - The number of input bytes can vary depending on previous input.
1717# - A single input byte can correspond to many characters of output.
1718# - The number of output characters can be undetermined until the
1719# last input byte is received.
1720# - The number of output characters can vary depending on previous input.
1721
1722class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1723 """
1724 For testing seek/tell behavior with a stateful, buffering decoder.
1725
1726 Input is a sequence of words. Words may be fixed-length (length set
1727 by input) or variable-length (period-terminated). In variable-length
1728 mode, extra periods are ignored. Possible words are:
1729 - 'i' followed by a number sets the input length, I (maximum 99).
1730 When I is set to 0, words are space-terminated.
1731 - 'o' followed by a number sets the output length, O (maximum 99).
1732 - Any other word is converted into a word followed by a period on
1733 the output. The output word consists of the input word truncated
1734 or padded out with hyphens to make its length equal to O. If O
1735 is 0, the word is output verbatim without truncating or padding.
1736 I and O are initially set to 1. When I changes, any buffered input is
1737 re-scanned according to the new I. EOF also terminates the last word.
1738 """
1739
1740 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001741 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001742 self.reset()
1743
1744 def __repr__(self):
1745 return '<SID %x>' % id(self)
1746
1747 def reset(self):
1748 self.i = 1
1749 self.o = 1
1750 self.buffer = bytearray()
1751
1752 def getstate(self):
1753 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1754 return bytes(self.buffer), i*100 + o
1755
1756 def setstate(self, state):
1757 buffer, io = state
1758 self.buffer = bytearray(buffer)
1759 i, o = divmod(io, 100)
1760 self.i, self.o = i ^ 1, o ^ 1
1761
1762 def decode(self, input, final=False):
1763 output = ''
1764 for b in input:
1765 if self.i == 0: # variable-length, terminated with period
1766 if b == ord('.'):
1767 if self.buffer:
1768 output += self.process_word()
1769 else:
1770 self.buffer.append(b)
1771 else: # fixed-length, terminate after self.i bytes
1772 self.buffer.append(b)
1773 if len(self.buffer) == self.i:
1774 output += self.process_word()
1775 if final and self.buffer: # EOF terminates the last word
1776 output += self.process_word()
1777 return output
1778
1779 def process_word(self):
1780 output = ''
1781 if self.buffer[0] == ord('i'):
1782 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1783 elif self.buffer[0] == ord('o'):
1784 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1785 else:
1786 output = self.buffer.decode('ascii')
1787 if len(output) < self.o:
1788 output += '-'*self.o # pad out with hyphens
1789 if self.o:
1790 output = output[:self.o] # truncate to output length
1791 output += '.'
1792 self.buffer = bytearray()
1793 return output
1794
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001795 codecEnabled = False
1796
1797 @classmethod
1798 def lookupTestDecoder(cls, name):
1799 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001800 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001801 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001802 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001803 incrementalencoder=None,
1804 streamreader=None, streamwriter=None,
1805 incrementaldecoder=cls)
1806
1807# Register the previous decoder for testing.
1808# Disabled by default, tests will enable it.
1809codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1810
1811
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001812class StatefulIncrementalDecoderTest(unittest.TestCase):
1813 """
1814 Make sure the StatefulIncrementalDecoder actually works.
1815 """
1816
1817 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001818 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001819 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001820 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001821 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001822 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001823 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001824 # I=0, O=6 (variable-length input, fixed-length output)
1825 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1826 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001827 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001828 # I=6, O=3 (fixed-length input > fixed-length output)
1829 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1830 # I=0, then 3; O=29, then 15 (with longer output)
1831 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1832 'a----------------------------.' +
1833 'b----------------------------.' +
1834 'cde--------------------------.' +
1835 'abcdefghijabcde.' +
1836 'a.b------------.' +
1837 '.c.------------.' +
1838 'd.e------------.' +
1839 'k--------------.' +
1840 'l--------------.' +
1841 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001842 ]
1843
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001844 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001845 # Try a few one-shot test cases.
1846 for input, eof, output in self.test_cases:
1847 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001848 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001849
1850 # Also test an unfinished decode, followed by forcing EOF.
1851 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001852 self.assertEqual(d.decode(b'oiabcd'), '')
1853 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001854
1855class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001856
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001857 def setUp(self):
1858 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1859 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001860 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001861
Guido van Rossumd0712812007-04-11 16:32:43 +00001862 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001863 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001864
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001865 def test_constructor(self):
1866 r = self.BytesIO(b"\xc3\xa9\n\n")
1867 b = self.BufferedReader(r, 1000)
1868 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001869 t.__init__(b, encoding="latin-1", newline="\r\n")
1870 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001871 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001872 t.__init__(b, encoding="utf-8", line_buffering=True)
1873 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001874 self.assertEqual(t.line_buffering, True)
1875 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001876 self.assertRaises(TypeError, t.__init__, b, newline=42)
1877 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1878
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001879 def test_detach(self):
1880 r = self.BytesIO()
1881 b = self.BufferedWriter(r)
1882 t = self.TextIOWrapper(b)
1883 self.assertIs(t.detach(), b)
1884
1885 t = self.TextIOWrapper(b, encoding="ascii")
1886 t.write("howdy")
1887 self.assertFalse(r.getvalue())
1888 t.detach()
1889 self.assertEqual(r.getvalue(), b"howdy")
1890 self.assertRaises(ValueError, t.detach)
1891
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001892 def test_repr(self):
1893 raw = self.BytesIO("hello".encode("utf-8"))
1894 b = self.BufferedReader(raw)
1895 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001896 modname = self.TextIOWrapper.__module__
1897 self.assertEqual(repr(t),
1898 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1899 raw.name = "dummy"
1900 self.assertEqual(repr(t),
1901 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001902 t.mode = "r"
1903 self.assertEqual(repr(t),
1904 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001905 raw.name = b"dummy"
1906 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001907 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001908
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001909 def test_line_buffering(self):
1910 r = self.BytesIO()
1911 b = self.BufferedWriter(r, 1000)
1912 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001913 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001914 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001915 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001916 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001917 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001918 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001919
Victor Stinnerf86a5e82012-06-05 13:43:22 +02001920 def test_default_encoding(self):
1921 old_environ = dict(os.environ)
1922 try:
1923 # try to get a user preferred encoding different than the current
1924 # locale encoding to check that TextIOWrapper() uses the current
1925 # locale encoding and not the user preferred encoding
1926 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
1927 if key in os.environ:
1928 del os.environ[key]
1929
1930 current_locale_encoding = locale.getpreferredencoding(False)
1931 b = self.BytesIO()
1932 t = self.TextIOWrapper(b)
1933 self.assertEqual(t.encoding, current_locale_encoding)
1934 finally:
1935 os.environ.clear()
1936 os.environ.update(old_environ)
1937
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001938 def test_encoding(self):
1939 # Check the encoding attribute is always set, and valid
1940 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001941 t = self.TextIOWrapper(b, encoding="utf-8")
1942 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001943 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001944 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001945 codecs.lookup(t.encoding)
1946
1947 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001948 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001949 b = self.BytesIO(b"abc\n\xff\n")
1950 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001951 self.assertRaises(UnicodeError, t.read)
1952 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001953 b = self.BytesIO(b"abc\n\xff\n")
1954 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001955 self.assertRaises(UnicodeError, t.read)
1956 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001957 b = self.BytesIO(b"abc\n\xff\n")
1958 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001959 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001960 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001961 b = self.BytesIO(b"abc\n\xff\n")
1962 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001963 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001964
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001965 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001966 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001967 b = self.BytesIO()
1968 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001969 self.assertRaises(UnicodeError, t.write, "\xff")
1970 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001971 b = self.BytesIO()
1972 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001973 self.assertRaises(UnicodeError, t.write, "\xff")
1974 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001975 b = self.BytesIO()
1976 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001977 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001978 t.write("abc\xffdef\n")
1979 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001980 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001981 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001982 b = self.BytesIO()
1983 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001984 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001985 t.write("abc\xffdef\n")
1986 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001987 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001988
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001989 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001990 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1991
1992 tests = [
1993 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001994 [ '', input_lines ],
1995 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1996 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1997 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001998 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001999 encodings = (
2000 'utf-8', 'latin-1',
2001 'utf-16', 'utf-16-le', 'utf-16-be',
2002 'utf-32', 'utf-32-le', 'utf-32-be',
2003 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002004
Guido van Rossum8358db22007-08-18 21:39:55 +00002005 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002006 # character in TextIOWrapper._pending_line.
2007 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002008 # XXX: str.encode() should return bytes
2009 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002010 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002011 for bufsize in range(1, 10):
2012 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002013 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2014 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002015 encoding=encoding)
2016 if do_reads:
2017 got_lines = []
2018 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002019 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002020 if c2 == '':
2021 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002022 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002023 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002024 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002025 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002026
2027 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002028 self.assertEqual(got_line, exp_line)
2029 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002030
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002031 def test_newlines_input(self):
2032 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002033 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2034 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002035 (None, normalized.decode("ascii").splitlines(keepends=True)),
2036 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002037 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2038 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2039 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002040 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002041 buf = self.BytesIO(testdata)
2042 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002043 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002044 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002045 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002047 def test_newlines_output(self):
2048 testdict = {
2049 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2050 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2051 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2052 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2053 }
2054 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2055 for newline, expected in tests:
2056 buf = self.BytesIO()
2057 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2058 txt.write("AAA\nB")
2059 txt.write("BB\nCCC\n")
2060 txt.write("X\rY\r\nZ")
2061 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002062 self.assertEqual(buf.closed, False)
2063 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002064
2065 def test_destructor(self):
2066 l = []
2067 base = self.BytesIO
2068 class MyBytesIO(base):
2069 def close(self):
2070 l.append(self.getvalue())
2071 base.close(self)
2072 b = MyBytesIO()
2073 t = self.TextIOWrapper(b, encoding="ascii")
2074 t.write("abc")
2075 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002076 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002077 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002078
2079 def test_override_destructor(self):
2080 record = []
2081 class MyTextIO(self.TextIOWrapper):
2082 def __del__(self):
2083 record.append(1)
2084 try:
2085 f = super().__del__
2086 except AttributeError:
2087 pass
2088 else:
2089 f()
2090 def close(self):
2091 record.append(2)
2092 super().close()
2093 def flush(self):
2094 record.append(3)
2095 super().flush()
2096 b = self.BytesIO()
2097 t = MyTextIO(b, encoding="ascii")
2098 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002099 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002100 self.assertEqual(record, [1, 2, 3])
2101
2102 def test_error_through_destructor(self):
2103 # Test that the exception state is not modified by a destructor,
2104 # even if close() fails.
2105 rawio = self.CloseFailureIO()
2106 def f():
2107 self.TextIOWrapper(rawio).xyzzy
2108 with support.captured_output("stderr") as s:
2109 self.assertRaises(AttributeError, f)
2110 s = s.getvalue().strip()
2111 if s:
2112 # The destructor *may* have printed an unraisable error, check it
2113 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002114 self.assertTrue(s.startswith("Exception IOError: "), s)
2115 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002116
Guido van Rossum9b76da62007-04-11 01:09:03 +00002117 # Systematic tests of the text I/O API
2118
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002119 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002120 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002121 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002122 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002123 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002124 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002125 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002126 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002127 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002128 self.assertEqual(f.tell(), 0)
2129 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002130 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002131 self.assertEqual(f.seek(0), 0)
2132 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002133 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002134 self.assertEqual(f.read(2), "ab")
2135 self.assertEqual(f.read(1), "c")
2136 self.assertEqual(f.read(1), "")
2137 self.assertEqual(f.read(), "")
2138 self.assertEqual(f.tell(), cookie)
2139 self.assertEqual(f.seek(0), 0)
2140 self.assertEqual(f.seek(0, 2), cookie)
2141 self.assertEqual(f.write("def"), 3)
2142 self.assertEqual(f.seek(cookie), cookie)
2143 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002144 if enc.startswith("utf"):
2145 self.multi_line_test(f, enc)
2146 f.close()
2147
2148 def multi_line_test(self, f, enc):
2149 f.seek(0)
2150 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002151 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002152 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002153 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002154 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002155 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002156 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002157 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002158 wlines.append((f.tell(), line))
2159 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002160 f.seek(0)
2161 rlines = []
2162 while True:
2163 pos = f.tell()
2164 line = f.readline()
2165 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002166 break
2167 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002168 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002169
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002170 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002171 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002172 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002173 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002174 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002175 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002176 p2 = f.tell()
2177 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002178 self.assertEqual(f.tell(), p0)
2179 self.assertEqual(f.readline(), "\xff\n")
2180 self.assertEqual(f.tell(), p1)
2181 self.assertEqual(f.readline(), "\xff\n")
2182 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002183 f.seek(0)
2184 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002185 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002186 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002187 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002188 f.close()
2189
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002190 def test_seeking(self):
2191 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002192 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002193 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002194 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002195 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002196 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002197 suffix = bytes(u_suffix.encode("utf-8"))
2198 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002199 with self.open(support.TESTFN, "wb") as f:
2200 f.write(line*2)
2201 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2202 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002203 self.assertEqual(s, str(prefix, "ascii"))
2204 self.assertEqual(f.tell(), prefix_size)
2205 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002206
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002207 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002208 # Regression test for a specific bug
2209 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002210 with self.open(support.TESTFN, "wb") as f:
2211 f.write(data)
2212 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2213 f._CHUNK_SIZE # Just test that it exists
2214 f._CHUNK_SIZE = 2
2215 f.readline()
2216 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002217
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002218 def test_seek_and_tell(self):
2219 #Test seek/tell using the StatefulIncrementalDecoder.
2220 # Make test faster by doing smaller seeks
2221 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002222
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002223 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002224 """Tell/seek to various points within a data stream and ensure
2225 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002226 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002227 f.write(data)
2228 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002229 f = self.open(support.TESTFN, encoding='test_decoder')
2230 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002231 decoded = f.read()
2232 f.close()
2233
Neal Norwitze2b07052008-03-18 19:52:05 +00002234 for i in range(min_pos, len(decoded) + 1): # seek positions
2235 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002236 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002237 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002238 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002239 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002240 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002241 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002242 f.close()
2243
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002244 # Enable the test decoder.
2245 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002246
2247 # Run the tests.
2248 try:
2249 # Try each test case.
2250 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002251 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002252
2253 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002254 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2255 offset = CHUNK_SIZE - len(input)//2
2256 prefix = b'.'*offset
2257 # Don't bother seeking into the prefix (takes too long).
2258 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002259 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002260
2261 # Ensure our test decoder won't interfere with subsequent tests.
2262 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002263 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002264
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002265 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002266 data = "1234567890"
2267 tests = ("utf-16",
2268 "utf-16-le",
2269 "utf-16-be",
2270 "utf-32",
2271 "utf-32-le",
2272 "utf-32-be")
2273 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002274 buf = self.BytesIO()
2275 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002276 # Check if the BOM is written only once (see issue1753).
2277 f.write(data)
2278 f.write(data)
2279 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002280 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002281 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002282 self.assertEqual(f.read(), data * 2)
2283 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002284
Benjamin Petersona1b49012009-03-31 23:11:32 +00002285 def test_unreadable(self):
2286 class UnReadable(self.BytesIO):
2287 def readable(self):
2288 return False
2289 txt = self.TextIOWrapper(UnReadable())
2290 self.assertRaises(IOError, txt.read)
2291
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002292 def test_read_one_by_one(self):
2293 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002294 reads = ""
2295 while True:
2296 c = txt.read(1)
2297 if not c:
2298 break
2299 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002300 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002301
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002302 def test_readlines(self):
2303 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2304 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2305 txt.seek(0)
2306 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2307 txt.seek(0)
2308 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2309
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002310 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002311 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002312 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002313 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002314 reads = ""
2315 while True:
2316 c = txt.read(128)
2317 if not c:
2318 break
2319 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002320 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002321
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002322 def test_writelines(self):
2323 l = ['ab', 'cd', 'ef']
2324 buf = self.BytesIO()
2325 txt = self.TextIOWrapper(buf)
2326 txt.writelines(l)
2327 txt.flush()
2328 self.assertEqual(buf.getvalue(), b'abcdef')
2329
2330 def test_writelines_userlist(self):
2331 l = UserList(['ab', 'cd', 'ef'])
2332 buf = self.BytesIO()
2333 txt = self.TextIOWrapper(buf)
2334 txt.writelines(l)
2335 txt.flush()
2336 self.assertEqual(buf.getvalue(), b'abcdef')
2337
2338 def test_writelines_error(self):
2339 txt = self.TextIOWrapper(self.BytesIO())
2340 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2341 self.assertRaises(TypeError, txt.writelines, None)
2342 self.assertRaises(TypeError, txt.writelines, b'abc')
2343
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002344 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002345 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002346
2347 # read one char at a time
2348 reads = ""
2349 while True:
2350 c = txt.read(1)
2351 if not c:
2352 break
2353 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002354 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002355
2356 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002357 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002358 txt._CHUNK_SIZE = 4
2359
2360 reads = ""
2361 while True:
2362 c = txt.read(4)
2363 if not c:
2364 break
2365 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002366 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002367
2368 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002369 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002370 txt._CHUNK_SIZE = 4
2371
2372 reads = txt.read(4)
2373 reads += txt.read(4)
2374 reads += txt.readline()
2375 reads += txt.readline()
2376 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002377 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002378
2379 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002380 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002381 txt._CHUNK_SIZE = 4
2382
2383 reads = txt.read(4)
2384 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002385 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002386
2387 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002388 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002389 txt._CHUNK_SIZE = 4
2390
2391 reads = txt.read(4)
2392 pos = txt.tell()
2393 txt.seek(0)
2394 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002395 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002396
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002397 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002398 buffer = self.BytesIO(self.testdata)
2399 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002400
2401 self.assertEqual(buffer.seekable(), txt.seekable())
2402
Antoine Pitroue4501852009-05-14 18:55:55 +00002403 def test_append_bom(self):
2404 # The BOM is not written again when appending to a non-empty file
2405 filename = support.TESTFN
2406 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2407 with self.open(filename, 'w', encoding=charset) as f:
2408 f.write('aaa')
2409 pos = f.tell()
2410 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002411 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002412
2413 with self.open(filename, 'a', encoding=charset) as f:
2414 f.write('xxx')
2415 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002416 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002417
2418 def test_seek_bom(self):
2419 # Same test, but when seeking manually
2420 filename = support.TESTFN
2421 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2422 with self.open(filename, 'w', encoding=charset) as f:
2423 f.write('aaa')
2424 pos = f.tell()
2425 with self.open(filename, 'r+', encoding=charset) as f:
2426 f.seek(pos)
2427 f.write('zzz')
2428 f.seek(0)
2429 f.write('bbb')
2430 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002431 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002432
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002433 def test_errors_property(self):
2434 with self.open(support.TESTFN, "w") as f:
2435 self.assertEqual(f.errors, "strict")
2436 with self.open(support.TESTFN, "w", errors="replace") as f:
2437 self.assertEqual(f.errors, "replace")
2438
Brett Cannon31f59292011-02-21 19:29:56 +00002439 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002440 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002441 def test_threads_write(self):
2442 # Issue6750: concurrent writes could duplicate data
2443 event = threading.Event()
2444 with self.open(support.TESTFN, "w", buffering=1) as f:
2445 def run(n):
2446 text = "Thread%03d\n" % n
2447 event.wait()
2448 f.write(text)
2449 threads = [threading.Thread(target=lambda n=x: run(n))
2450 for x in range(20)]
2451 for t in threads:
2452 t.start()
2453 time.sleep(0.02)
2454 event.set()
2455 for t in threads:
2456 t.join()
2457 with self.open(support.TESTFN) as f:
2458 content = f.read()
2459 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002460 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002461
Antoine Pitrou6be88762010-05-03 16:48:20 +00002462 def test_flush_error_on_close(self):
2463 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2464 def bad_flush():
2465 raise IOError()
2466 txt.flush = bad_flush
2467 self.assertRaises(IOError, txt.close) # exception not swallowed
2468
2469 def test_multi_close(self):
2470 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2471 txt.close()
2472 txt.close()
2473 txt.close()
2474 self.assertRaises(ValueError, txt.flush)
2475
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002476 def test_unseekable(self):
2477 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2478 self.assertRaises(self.UnsupportedOperation, txt.tell)
2479 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2480
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002481 def test_readonly_attributes(self):
2482 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2483 buf = self.BytesIO(self.testdata)
2484 with self.assertRaises(AttributeError):
2485 txt.buffer = buf
2486
Antoine Pitroue96ec682011-07-23 21:46:35 +02002487 def test_rawio(self):
2488 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2489 # that subprocess.Popen() can have the required unbuffered
2490 # semantics with universal_newlines=True.
2491 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2492 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2493 # Reads
2494 self.assertEqual(txt.read(4), 'abcd')
2495 self.assertEqual(txt.readline(), 'efghi\n')
2496 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2497
2498 def test_rawio_write_through(self):
2499 # Issue #12591: with write_through=True, writes don't need a flush
2500 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2501 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2502 write_through=True)
2503 txt.write('1')
2504 txt.write('23\n4')
2505 txt.write('5')
2506 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2507
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002508class CTextIOWrapperTest(TextIOWrapperTest):
2509
2510 def test_initialization(self):
2511 r = self.BytesIO(b"\xc3\xa9\n\n")
2512 b = self.BufferedReader(r, 1000)
2513 t = self.TextIOWrapper(b)
2514 self.assertRaises(TypeError, t.__init__, b, newline=42)
2515 self.assertRaises(ValueError, t.read)
2516 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2517 self.assertRaises(ValueError, t.read)
2518
2519 def test_garbage_collection(self):
2520 # C TextIOWrapper objects are collected, and collecting them flushes
2521 # all data to disk.
2522 # The Python version has __del__, so it ends in gc.garbage instead.
2523 rawio = io.FileIO(support.TESTFN, "wb")
2524 b = self.BufferedWriter(rawio)
2525 t = self.TextIOWrapper(b, encoding="ascii")
2526 t.write("456def")
2527 t.x = t
2528 wr = weakref.ref(t)
2529 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002530 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002531 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002532 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002533 self.assertEqual(f.read(), b"456def")
2534
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002535 def test_rwpair_cleared_before_textio(self):
2536 # Issue 13070: TextIOWrapper's finalization would crash when called
2537 # after the reference to the underlying BufferedRWPair's writer got
2538 # cleared by the GC.
2539 for i in range(1000):
2540 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2541 t1 = self.TextIOWrapper(b1, encoding="ascii")
2542 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2543 t2 = self.TextIOWrapper(b2, encoding="ascii")
2544 # circular references
2545 t1.buddy = t2
2546 t2.buddy = t1
2547 support.gc_collect()
2548
2549
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002550class PyTextIOWrapperTest(TextIOWrapperTest):
2551 pass
2552
2553
2554class IncrementalNewlineDecoderTest(unittest.TestCase):
2555
2556 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002557 # UTF-8 specific tests for a newline decoder
2558 def _check_decode(b, s, **kwargs):
2559 # We exercise getstate() / setstate() as well as decode()
2560 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002561 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002562 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002563 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002564
Antoine Pitrou180a3362008-12-14 16:36:46 +00002565 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002566
Antoine Pitrou180a3362008-12-14 16:36:46 +00002567 _check_decode(b'\xe8', "")
2568 _check_decode(b'\xa2', "")
2569 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002570
Antoine Pitrou180a3362008-12-14 16:36:46 +00002571 _check_decode(b'\xe8', "")
2572 _check_decode(b'\xa2', "")
2573 _check_decode(b'\x88', "\u8888")
2574
2575 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002576 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2577
Antoine Pitrou180a3362008-12-14 16:36:46 +00002578 decoder.reset()
2579 _check_decode(b'\n', "\n")
2580 _check_decode(b'\r', "")
2581 _check_decode(b'', "\n", final=True)
2582 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002583
Antoine Pitrou180a3362008-12-14 16:36:46 +00002584 _check_decode(b'\r', "")
2585 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002586
Antoine Pitrou180a3362008-12-14 16:36:46 +00002587 _check_decode(b'\r\r\n', "\n\n")
2588 _check_decode(b'\r', "")
2589 _check_decode(b'\r', "\n")
2590 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002591
Antoine Pitrou180a3362008-12-14 16:36:46 +00002592 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2593 _check_decode(b'\xe8\xa2\x88', "\u8888")
2594 _check_decode(b'\n', "\n")
2595 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2596 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002597
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002598 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002599 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002600 if encoding is not None:
2601 encoder = codecs.getincrementalencoder(encoding)()
2602 def _decode_bytewise(s):
2603 # Decode one byte at a time
2604 for b in encoder.encode(s):
2605 result.append(decoder.decode(bytes([b])))
2606 else:
2607 encoder = None
2608 def _decode_bytewise(s):
2609 # Decode one char at a time
2610 for c in s:
2611 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002612 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002613 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002614 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002615 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002616 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002617 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002618 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002619 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002620 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002621 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002622 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002623 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002624 input = "abc"
2625 if encoder is not None:
2626 encoder.reset()
2627 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002628 self.assertEqual(decoder.decode(input), "abc")
2629 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002630
2631 def test_newline_decoder(self):
2632 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002633 # None meaning the IncrementalNewlineDecoder takes unicode input
2634 # rather than bytes input
2635 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002636 'utf-16', 'utf-16-le', 'utf-16-be',
2637 'utf-32', 'utf-32-le', 'utf-32-be',
2638 )
2639 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002640 decoder = enc and codecs.getincrementaldecoder(enc)()
2641 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2642 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002643 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002644 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2645 self.check_newline_decoding_utf8(decoder)
2646
Antoine Pitrou66913e22009-03-06 23:40:56 +00002647 def test_newline_bytes(self):
2648 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2649 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002650 self.assertEqual(dec.newlines, None)
2651 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2652 self.assertEqual(dec.newlines, None)
2653 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2654 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002655 dec = self.IncrementalNewlineDecoder(None, translate=False)
2656 _check(dec)
2657 dec = self.IncrementalNewlineDecoder(None, translate=True)
2658 _check(dec)
2659
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002660class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2661 pass
2662
2663class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2664 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002665
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002666
Guido van Rossum01a27522007-03-07 01:00:12 +00002667# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002668
Guido van Rossum5abbf752007-08-27 17:39:33 +00002669class MiscIOTest(unittest.TestCase):
2670
Barry Warsaw40e82462008-11-20 20:14:50 +00002671 def tearDown(self):
2672 support.unlink(support.TESTFN)
2673
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002674 def test___all__(self):
2675 for name in self.io.__all__:
2676 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002677 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002678 if name == "open":
2679 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002680 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002681 self.assertTrue(issubclass(obj, Exception), name)
2682 elif not name.startswith("SEEK_"):
2683 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002684
Barry Warsaw40e82462008-11-20 20:14:50 +00002685 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002686 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002687 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002688 f.close()
2689
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002690 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002691 self.assertEqual(f.name, support.TESTFN)
2692 self.assertEqual(f.buffer.name, support.TESTFN)
2693 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2694 self.assertEqual(f.mode, "U")
2695 self.assertEqual(f.buffer.mode, "rb")
2696 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002697 f.close()
2698
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002699 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002700 self.assertEqual(f.mode, "w+")
2701 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2702 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002703
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002704 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002705 self.assertEqual(g.mode, "wb")
2706 self.assertEqual(g.raw.mode, "wb")
2707 self.assertEqual(g.name, f.fileno())
2708 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002709 f.close()
2710 g.close()
2711
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002712 def test_io_after_close(self):
2713 for kwargs in [
2714 {"mode": "w"},
2715 {"mode": "wb"},
2716 {"mode": "w", "buffering": 1},
2717 {"mode": "w", "buffering": 2},
2718 {"mode": "wb", "buffering": 0},
2719 {"mode": "r"},
2720 {"mode": "rb"},
2721 {"mode": "r", "buffering": 1},
2722 {"mode": "r", "buffering": 2},
2723 {"mode": "rb", "buffering": 0},
2724 {"mode": "w+"},
2725 {"mode": "w+b"},
2726 {"mode": "w+", "buffering": 1},
2727 {"mode": "w+", "buffering": 2},
2728 {"mode": "w+b", "buffering": 0},
2729 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002730 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002731 f.close()
2732 self.assertRaises(ValueError, f.flush)
2733 self.assertRaises(ValueError, f.fileno)
2734 self.assertRaises(ValueError, f.isatty)
2735 self.assertRaises(ValueError, f.__iter__)
2736 if hasattr(f, "peek"):
2737 self.assertRaises(ValueError, f.peek, 1)
2738 self.assertRaises(ValueError, f.read)
2739 if hasattr(f, "read1"):
2740 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002741 if hasattr(f, "readall"):
2742 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002743 if hasattr(f, "readinto"):
2744 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2745 self.assertRaises(ValueError, f.readline)
2746 self.assertRaises(ValueError, f.readlines)
2747 self.assertRaises(ValueError, f.seek, 0)
2748 self.assertRaises(ValueError, f.tell)
2749 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002750 self.assertRaises(ValueError, f.write,
2751 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002752 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002753 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002754
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002755 def test_blockingioerror(self):
2756 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002757 class C(str):
2758 pass
2759 c = C("")
2760 b = self.BlockingIOError(1, c)
2761 c.b = b
2762 b.c = c
2763 wr = weakref.ref(c)
2764 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002765 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002766 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002767
2768 def test_abcs(self):
2769 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002770 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2771 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2772 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2773 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002774
2775 def _check_abc_inheritance(self, abcmodule):
2776 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002777 self.assertIsInstance(f, abcmodule.IOBase)
2778 self.assertIsInstance(f, abcmodule.RawIOBase)
2779 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2780 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002781 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002782 self.assertIsInstance(f, abcmodule.IOBase)
2783 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2784 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2785 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002786 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002787 self.assertIsInstance(f, abcmodule.IOBase)
2788 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2789 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2790 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002791
2792 def test_abc_inheritance(self):
2793 # Test implementations inherit from their respective ABCs
2794 self._check_abc_inheritance(self)
2795
2796 def test_abc_inheritance_official(self):
2797 # Test implementations inherit from the official ABCs of the
2798 # baseline "io" module.
2799 self._check_abc_inheritance(io)
2800
Antoine Pitroue033e062010-10-29 10:38:18 +00002801 def _check_warn_on_dealloc(self, *args, **kwargs):
2802 f = open(*args, **kwargs)
2803 r = repr(f)
2804 with self.assertWarns(ResourceWarning) as cm:
2805 f = None
2806 support.gc_collect()
2807 self.assertIn(r, str(cm.warning.args[0]))
2808
2809 def test_warn_on_dealloc(self):
2810 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2811 self._check_warn_on_dealloc(support.TESTFN, "wb")
2812 self._check_warn_on_dealloc(support.TESTFN, "w")
2813
2814 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2815 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002816 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002817 for fd in fds:
2818 try:
2819 os.close(fd)
2820 except EnvironmentError as e:
2821 if e.errno != errno.EBADF:
2822 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002823 self.addCleanup(cleanup_fds)
2824 r, w = os.pipe()
2825 fds += r, w
2826 self._check_warn_on_dealloc(r, *args, **kwargs)
2827 # When using closefd=False, there's no warning
2828 r, w = os.pipe()
2829 fds += r, w
2830 with warnings.catch_warnings(record=True) as recorded:
2831 open(r, *args, closefd=False, **kwargs)
2832 support.gc_collect()
2833 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002834
2835 def test_warn_on_dealloc_fd(self):
2836 self._check_warn_on_dealloc_fd("rb", buffering=0)
2837 self._check_warn_on_dealloc_fd("rb")
2838 self._check_warn_on_dealloc_fd("r")
2839
2840
Antoine Pitrou243757e2010-11-05 21:15:39 +00002841 def test_pickling(self):
2842 # Pickling file objects is forbidden
2843 for kwargs in [
2844 {"mode": "w"},
2845 {"mode": "wb"},
2846 {"mode": "wb", "buffering": 0},
2847 {"mode": "r"},
2848 {"mode": "rb"},
2849 {"mode": "rb", "buffering": 0},
2850 {"mode": "w+"},
2851 {"mode": "w+b"},
2852 {"mode": "w+b", "buffering": 0},
2853 ]:
2854 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2855 with self.open(support.TESTFN, **kwargs) as f:
2856 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2857
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002858 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2859 def test_nonblock_pipe_write_bigbuf(self):
2860 self._test_nonblock_pipe_write(16*1024)
2861
2862 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2863 def test_nonblock_pipe_write_smallbuf(self):
2864 self._test_nonblock_pipe_write(1024)
2865
2866 def _set_non_blocking(self, fd):
2867 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2868 self.assertNotEqual(flags, -1)
2869 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2870 self.assertEqual(res, 0)
2871
2872 def _test_nonblock_pipe_write(self, bufsize):
2873 sent = []
2874 received = []
2875 r, w = os.pipe()
2876 self._set_non_blocking(r)
2877 self._set_non_blocking(w)
2878
2879 # To exercise all code paths in the C implementation we need
2880 # to play with buffer sizes. For instance, if we choose a
2881 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2882 # then we will never get a partial write of the buffer.
2883 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2884 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2885
2886 with rf, wf:
2887 for N in 9999, 73, 7574:
2888 try:
2889 i = 0
2890 while True:
2891 msg = bytes([i % 26 + 97]) * N
2892 sent.append(msg)
2893 wf.write(msg)
2894 i += 1
2895
2896 except self.BlockingIOError as e:
2897 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002898 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002899 sent[-1] = sent[-1][:e.characters_written]
2900 received.append(rf.read())
2901 msg = b'BLOCKED'
2902 wf.write(msg)
2903 sent.append(msg)
2904
2905 while True:
2906 try:
2907 wf.flush()
2908 break
2909 except self.BlockingIOError as e:
2910 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002911 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002912 self.assertEqual(e.characters_written, 0)
2913 received.append(rf.read())
2914
2915 received += iter(rf.read, None)
2916
2917 sent, received = b''.join(sent), b''.join(received)
2918 self.assertTrue(sent == received)
2919 self.assertTrue(wf.closed)
2920 self.assertTrue(rf.closed)
2921
Charles-François Natalidc3044c2012-01-09 22:40:02 +01002922 def test_create_fail(self):
2923 # 'x' mode fails if file is existing
2924 with self.open(support.TESTFN, 'w'):
2925 pass
2926 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
2927
2928 def test_create_writes(self):
2929 # 'x' mode opens for writing
2930 with self.open(support.TESTFN, 'xb') as f:
2931 f.write(b"spam")
2932 with self.open(support.TESTFN, 'rb') as f:
2933 self.assertEqual(b"spam", f.read())
2934
Christian Heimes7b648752012-09-10 14:48:43 +02002935 def test_open_allargs(self):
2936 # there used to be a buffer overflow in the parser for rawmode
2937 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
2938
2939
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002940class CMiscIOTest(MiscIOTest):
2941 io = io
2942
2943class PyMiscIOTest(MiscIOTest):
2944 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002945
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002946
2947@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2948class SignalsTest(unittest.TestCase):
2949
2950 def setUp(self):
2951 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2952
2953 def tearDown(self):
2954 signal.signal(signal.SIGALRM, self.oldalrm)
2955
2956 def alarm_interrupt(self, sig, frame):
2957 1/0
2958
2959 @unittest.skipUnless(threading, 'Threading required for this test.')
2960 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2961 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00002962 invokes the signal handler, and bubbles up the exception raised
2963 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002964 read_results = []
2965 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02002966 if hasattr(signal, 'pthread_sigmask'):
2967 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002968 s = os.read(r, 1)
2969 read_results.append(s)
2970 t = threading.Thread(target=_read)
2971 t.daemon = True
2972 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002973 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002974 try:
2975 wio = self.io.open(w, **fdopen_kwargs)
2976 t.start()
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07002977 signal.alarm(1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002978 # Fill the pipe enough that the write will be blocking.
2979 # It will be interrupted by the timer armed above. Since the
2980 # other thread has read one byte, the low-level write will
2981 # return with a successful (partial) result rather than an EINTR.
2982 # The buffered IO layer must check for pending signal
2983 # handlers, which in this case will invoke alarm_interrupt().
2984 self.assertRaises(ZeroDivisionError,
Charles-François Natali2d517212011-05-29 16:36:44 +02002985 wio.write, item * (support.PIPE_MAX_SIZE // len(item)))
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002986 t.join()
2987 # We got one byte, get another one and check that it isn't a
2988 # repeat of the first one.
2989 read_results.append(os.read(r, 1))
2990 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2991 finally:
2992 os.close(w)
2993 os.close(r)
2994 # This is deliberate. If we didn't close the file descriptor
2995 # before closing wio, wio would try to flush its internal
2996 # buffer, and block again.
2997 try:
2998 wio.close()
2999 except IOError as e:
3000 if e.errno != errno.EBADF:
3001 raise
3002
3003 def test_interrupted_write_unbuffered(self):
3004 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3005
3006 def test_interrupted_write_buffered(self):
3007 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3008
3009 def test_interrupted_write_text(self):
3010 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3011
Brett Cannon31f59292011-02-21 19:29:56 +00003012 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003013 def check_reentrant_write(self, data, **fdopen_kwargs):
3014 def on_alarm(*args):
3015 # Will be called reentrantly from the same thread
3016 wio.write(data)
3017 1/0
3018 signal.signal(signal.SIGALRM, on_alarm)
3019 r, w = os.pipe()
3020 wio = self.io.open(w, **fdopen_kwargs)
3021 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003022 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003023 # Either the reentrant call to wio.write() fails with RuntimeError,
3024 # or the signal handler raises ZeroDivisionError.
3025 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3026 while 1:
3027 for i in range(100):
3028 wio.write(data)
3029 wio.flush()
3030 # Make sure the buffer doesn't fill up and block further writes
3031 os.read(r, len(data) * 100)
3032 exc = cm.exception
3033 if isinstance(exc, RuntimeError):
3034 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3035 finally:
3036 wio.close()
3037 os.close(r)
3038
3039 def test_reentrant_write_buffered(self):
3040 self.check_reentrant_write(b"xy", mode="wb")
3041
3042 def test_reentrant_write_text(self):
3043 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3044
Antoine Pitrou707ce822011-02-25 21:24:11 +00003045 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3046 """Check that a buffered read, when it gets interrupted (either
3047 returning a partial result or EINTR), properly invokes the signal
3048 handler and retries if the latter returned successfully."""
3049 r, w = os.pipe()
3050 fdopen_kwargs["closefd"] = False
3051 def alarm_handler(sig, frame):
3052 os.write(w, b"bar")
3053 signal.signal(signal.SIGALRM, alarm_handler)
3054 try:
3055 rio = self.io.open(r, **fdopen_kwargs)
3056 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003057 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003058 # Expected behaviour:
3059 # - first raw read() returns partial b"foo"
3060 # - second raw read() returns EINTR
3061 # - third raw read() returns b"bar"
3062 self.assertEqual(decode(rio.read(6)), "foobar")
3063 finally:
3064 rio.close()
3065 os.close(w)
3066 os.close(r)
3067
Antoine Pitrou20db5112011-08-19 20:32:34 +02003068 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003069 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3070 mode="rb")
3071
Antoine Pitrou20db5112011-08-19 20:32:34 +02003072 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003073 self.check_interrupted_read_retry(lambda x: x,
3074 mode="r")
3075
3076 @unittest.skipUnless(threading, 'Threading required for this test.')
3077 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3078 """Check that a buffered write, when it gets interrupted (either
3079 returning a partial result or EINTR), properly invokes the signal
3080 handler and retries if the latter returned successfully."""
3081 select = support.import_module("select")
3082 # A quantity that exceeds the buffer size of an anonymous pipe's
3083 # write end.
3084 N = 1024 * 1024
3085 r, w = os.pipe()
3086 fdopen_kwargs["closefd"] = False
3087 # We need a separate thread to read from the pipe and allow the
3088 # write() to finish. This thread is started after the SIGALRM is
3089 # received (forcing a first EINTR in write()).
3090 read_results = []
3091 write_finished = False
3092 def _read():
3093 while not write_finished:
3094 while r in select.select([r], [], [], 1.0)[0]:
3095 s = os.read(r, 1024)
3096 read_results.append(s)
3097 t = threading.Thread(target=_read)
3098 t.daemon = True
3099 def alarm1(sig, frame):
3100 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003101 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003102 def alarm2(sig, frame):
3103 t.start()
3104 signal.signal(signal.SIGALRM, alarm1)
3105 try:
3106 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003107 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003108 # Expected behaviour:
3109 # - first raw write() is partial (because of the limited pipe buffer
3110 # and the first alarm)
3111 # - second raw write() returns EINTR (because of the second alarm)
3112 # - subsequent write()s are successful (either partial or complete)
3113 self.assertEqual(N, wio.write(item * N))
3114 wio.flush()
3115 write_finished = True
3116 t.join()
3117 self.assertEqual(N, sum(len(x) for x in read_results))
3118 finally:
3119 write_finished = True
3120 os.close(w)
3121 os.close(r)
3122 # This is deliberate. If we didn't close the file descriptor
3123 # before closing wio, wio would try to flush its internal
3124 # buffer, and could block (in case of failure).
3125 try:
3126 wio.close()
3127 except IOError as e:
3128 if e.errno != errno.EBADF:
3129 raise
3130
Antoine Pitrou20db5112011-08-19 20:32:34 +02003131 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003132 self.check_interrupted_write_retry(b"x", mode="wb")
3133
Antoine Pitrou20db5112011-08-19 20:32:34 +02003134 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003135 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3136
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003137
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003138class CSignalsTest(SignalsTest):
3139 io = io
3140
3141class PySignalsTest(SignalsTest):
3142 io = pyio
3143
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003144 # Handling reentrancy issues would slow down _pyio even more, so the
3145 # tests are disabled.
3146 test_reentrant_write_buffered = None
3147 test_reentrant_write_text = None
3148
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003149
Guido van Rossum28524c72007-02-27 05:47:44 +00003150def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003151 tests = (CIOTest, PyIOTest,
3152 CBufferedReaderTest, PyBufferedReaderTest,
3153 CBufferedWriterTest, PyBufferedWriterTest,
3154 CBufferedRWPairTest, PyBufferedRWPairTest,
3155 CBufferedRandomTest, PyBufferedRandomTest,
3156 StatefulIncrementalDecoderTest,
3157 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3158 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003159 CMiscIOTest, PyMiscIOTest,
3160 CSignalsTest, PySignalsTest,
3161 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003162
3163 # Put the namespaces of the IO module we are testing and some useful mock
3164 # classes in the __dict__ of each test.
3165 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003166 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003167 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3168 c_io_ns = {name : getattr(io, name) for name in all_members}
3169 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3170 globs = globals()
3171 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3172 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3173 # Avoid turning open into a bound method.
3174 py_io_ns["open"] = pyio.OpenWrapper
3175 for test in tests:
3176 if test.__name__.startswith("C"):
3177 for name, obj in c_io_ns.items():
3178 setattr(test, name, obj)
3179 elif test.__name__.startswith("Py"):
3180 for name, obj in py_io_ns.items():
3181 setattr(test, name, obj)
3182
3183 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00003184
3185if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003186 test_main()