blob: 73231981adebbf8a5352876bf1e208b719d5806f [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Berker Peksagce643912015-05-06 06:33:17 +030038from test.support.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Martin Panter6bb91f32016-05-28 00:41:57 +000048try:
49 import ctypes
50except ImportError:
51 def byteslike(*pos, **kw):
52 return array.array("b", bytes(*pos, **kw))
53else:
54 def byteslike(*pos, **kw):
55 """Create a bytes-like object having no string or sequence methods"""
56 data = bytes(*pos, **kw)
57 obj = EmptyStruct()
58 ctypes.resize(obj, len(data))
59 memoryview(obj).cast("B")[:] = data
60 return obj
61 class EmptyStruct(ctypes.Structure):
62 pass
63
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064def _default_chunk_size():
65 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000066 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000067 return f._CHUNK_SIZE
68
69
Antoine Pitrou328ec742010-09-14 18:37:24 +000070class MockRawIOWithoutRead:
71 """A RawIO implementation without read(), so as to exercise the default
72 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000073
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000074 def __init__(self, read_stack=()):
75 self._read_stack = list(read_stack)
76 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000077 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000078 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000079
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000082 return len(b)
83
84 def writable(self):
85 return True
86
Guido van Rossum68bbcd22007-02-27 17:19:33 +000087 def fileno(self):
88 return 42
89
90 def readable(self):
91 return True
92
Guido van Rossum01a27522007-03-07 01:00:12 +000093 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000094 return True
95
Guido van Rossum01a27522007-03-07 01:00:12 +000096 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000098
99 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000100 return 0 # same comment as above
101
102 def readinto(self, buf):
103 self._reads += 1
104 max_len = len(buf)
105 try:
106 data = self._read_stack[0]
107 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000108 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000109 return 0
110 if data is None:
111 del self._read_stack[0]
112 return None
113 n = len(data)
114 if len(data) <= max_len:
115 del self._read_stack[0]
116 buf[:n] = data
117 return n
118 else:
119 buf[:] = data[:max_len]
120 self._read_stack[0] = data[max_len:]
121 return max_len
122
123 def truncate(self, pos=None):
124 return pos
125
Antoine Pitrou328ec742010-09-14 18:37:24 +0000126class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
127 pass
128
129class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
130 pass
131
132
133class MockRawIO(MockRawIOWithoutRead):
134
135 def read(self, n=None):
136 self._reads += 1
137 try:
138 return self._read_stack.pop(0)
139 except:
140 self._extraneous_reads += 1
141 return b""
142
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000143class CMockRawIO(MockRawIO, io.RawIOBase):
144 pass
145
146class PyMockRawIO(MockRawIO, pyio.RawIOBase):
147 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000148
Guido van Rossuma9e20242007-03-08 00:43:48 +0000149
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000150class MisbehavedRawIO(MockRawIO):
151 def write(self, b):
152 return super().write(b) * 2
153
154 def read(self, n=None):
155 return super().read(n) * 2
156
157 def seek(self, pos, whence):
158 return -123
159
160 def tell(self):
161 return -456
162
163 def readinto(self, buf):
164 super().readinto(buf)
165 return len(buf) * 5
166
167class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
168 pass
169
170class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
171 pass
172
173
174class CloseFailureIO(MockRawIO):
175 closed = 0
176
177 def close(self):
178 if not self.closed:
179 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200180 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181
182class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
183 pass
184
185class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
186 pass
187
188
189class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000190
191 def __init__(self, data):
192 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000194
195 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000197 self.read_history.append(None if res is None else len(res))
198 return res
199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def readinto(self, b):
201 res = super().readinto(b)
202 self.read_history.append(res)
203 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000204
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000205class CMockFileIO(MockFileIO, io.BytesIO):
206 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000207
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000208class PyMockFileIO(MockFileIO, pyio.BytesIO):
209 pass
210
211
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000212class MockUnseekableIO:
213 def seekable(self):
214 return False
215
216 def seek(self, *args):
217 raise self.UnsupportedOperation("not seekable")
218
219 def tell(self, *args):
220 raise self.UnsupportedOperation("not seekable")
221
Martin Panter754aab22016-03-31 07:21:56 +0000222 def truncate(self, *args):
223 raise self.UnsupportedOperation("not seekable")
224
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000225class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
226 UnsupportedOperation = io.UnsupportedOperation
227
228class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
229 UnsupportedOperation = pyio.UnsupportedOperation
230
231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000232class MockNonBlockWriterIO:
233
234 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000235 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000236 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000238 def pop_written(self):
239 s = b"".join(self._write_stack)
240 self._write_stack[:] = []
241 return s
242
243 def block_on(self, char):
244 """Block when a given char is encountered."""
245 self._blocker_char = char
246
247 def readable(self):
248 return True
249
250 def seekable(self):
251 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000252
Guido van Rossum01a27522007-03-07 01:00:12 +0000253 def writable(self):
254 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000255
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000256 def write(self, b):
257 b = bytes(b)
258 n = -1
259 if self._blocker_char:
260 try:
261 n = b.index(self._blocker_char)
262 except ValueError:
263 pass
264 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100265 if n > 0:
266 # write data up to the first blocker
267 self._write_stack.append(b[:n])
268 return n
269 else:
270 # cancel blocker and indicate would block
271 self._blocker_char = None
272 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000273 self._write_stack.append(b)
274 return len(b)
275
276class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
277 BlockingIOError = io.BlockingIOError
278
279class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
280 BlockingIOError = pyio.BlockingIOError
281
Guido van Rossuma9e20242007-03-08 00:43:48 +0000282
Guido van Rossum28524c72007-02-27 05:47:44 +0000283class IOTest(unittest.TestCase):
284
Neal Norwitze7789b12008-03-24 06:18:09 +0000285 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000286 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000287
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000288 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000289 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000290
Guido van Rossum28524c72007-02-27 05:47:44 +0000291 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000292 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293 f.truncate(0)
294 self.assertEqual(f.tell(), 5)
295 f.seek(0)
296
297 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000298 self.assertEqual(f.seek(0), 0)
299 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000300 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000301 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000302 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000303 buffer = bytearray(b" world\n\n\n")
304 self.assertEqual(f.write(buffer), 9)
305 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000306 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000307 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000308 self.assertEqual(f.seek(-1, 2), 13)
309 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000310
Guido van Rossum87429772007-04-10 21:06:59 +0000311 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000312 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000313 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000314
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 def read_ops(self, f, buffered=False):
316 data = f.read(5)
317 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000318 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000320 self.assertEqual(bytes(data), b" worl")
321 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000322 self.assertEqual(f.readinto(data), 2)
323 self.assertEqual(len(data), 5)
324 self.assertEqual(data[:2], b"d\n")
325 self.assertEqual(f.seek(0), 0)
326 self.assertEqual(f.read(20), b"hello world\n")
327 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000328 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000329 self.assertEqual(f.seek(-6, 2), 6)
330 self.assertEqual(f.read(5), b"world")
331 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000332 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000333 self.assertEqual(f.seek(-6, 1), 5)
334 self.assertEqual(f.read(5), b" worl")
335 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000336 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000337 if buffered:
338 f.seek(0)
339 self.assertEqual(f.read(), b"hello world\n")
340 f.seek(6)
341 self.assertEqual(f.read(), b"world\n")
342 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000343 f.seek(0)
344 data = byteslike(5)
345 self.assertEqual(f.readinto1(data), 5)
346 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000347
Guido van Rossum34d69e52007-04-10 20:08:41 +0000348 LARGE = 2**31
349
Guido van Rossum53807da2007-04-10 19:01:47 +0000350 def large_file_ops(self, f):
351 assert f.readable()
352 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000353 self.assertEqual(f.seek(self.LARGE), self.LARGE)
354 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000355 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000356 self.assertEqual(f.tell(), self.LARGE + 3)
357 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000358 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000359 self.assertEqual(f.tell(), self.LARGE + 2)
360 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000361 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000362 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000363 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
364 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000365 self.assertEqual(f.read(2), b"x")
366
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000367 def test_invalid_operations(self):
368 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000369 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000370 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000371 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000372 self.assertRaises(exc, fp.read)
373 self.assertRaises(exc, fp.readline)
374 with self.open(support.TESTFN, "wb", buffering=0) as fp:
375 self.assertRaises(exc, fp.read)
376 self.assertRaises(exc, fp.readline)
377 with self.open(support.TESTFN, "rb", buffering=0) as fp:
378 self.assertRaises(exc, fp.write, b"blah")
379 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000380 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000381 self.assertRaises(exc, fp.write, b"blah")
382 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000383 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000384 self.assertRaises(exc, fp.write, "blah")
385 self.assertRaises(exc, fp.writelines, ["blah\n"])
386 # Non-zero seeking from current or end pos
387 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
388 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000389
Martin Panter754aab22016-03-31 07:21:56 +0000390 def test_optional_abilities(self):
391 # Test for OSError when optional APIs are not supported
392 # The purpose of this test is to try fileno(), reading, writing and
393 # seeking operations with various objects that indicate they do not
394 # support these operations.
395
396 def pipe_reader():
397 [r, w] = os.pipe()
398 os.close(w) # So that read() is harmless
399 return self.FileIO(r, "r")
400
401 def pipe_writer():
402 [r, w] = os.pipe()
403 self.addCleanup(os.close, r)
404 # Guarantee that we can write into the pipe without blocking
405 thread = threading.Thread(target=os.read, args=(r, 100))
406 thread.start()
407 self.addCleanup(thread.join)
408 return self.FileIO(w, "w")
409
410 def buffered_reader():
411 return self.BufferedReader(self.MockUnseekableIO())
412
413 def buffered_writer():
414 return self.BufferedWriter(self.MockUnseekableIO())
415
416 def buffered_random():
417 return self.BufferedRandom(self.BytesIO())
418
419 def buffered_rw_pair():
420 return self.BufferedRWPair(self.MockUnseekableIO(),
421 self.MockUnseekableIO())
422
423 def text_reader():
424 class UnseekableReader(self.MockUnseekableIO):
425 writable = self.BufferedIOBase.writable
426 write = self.BufferedIOBase.write
427 return self.TextIOWrapper(UnseekableReader(), "ascii")
428
429 def text_writer():
430 class UnseekableWriter(self.MockUnseekableIO):
431 readable = self.BufferedIOBase.readable
432 read = self.BufferedIOBase.read
433 return self.TextIOWrapper(UnseekableWriter(), "ascii")
434
435 tests = (
436 (pipe_reader, "fr"), (pipe_writer, "fw"),
437 (buffered_reader, "r"), (buffered_writer, "w"),
438 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
439 (text_reader, "r"), (text_writer, "w"),
440 (self.BytesIO, "rws"), (self.StringIO, "rws"),
441 )
442 for [test, abilities] in tests:
443 if test is pipe_writer and not threading:
444 continue # Skip subtest that uses a background thread
445 with self.subTest(test), test() as obj:
446 readable = "r" in abilities
447 self.assertEqual(obj.readable(), readable)
448 writable = "w" in abilities
449 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000450
451 if isinstance(obj, self.TextIOBase):
452 data = "3"
453 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
454 data = b"3"
455 else:
456 self.fail("Unknown base class")
457
458 if "f" in abilities:
459 obj.fileno()
460 else:
461 self.assertRaises(OSError, obj.fileno)
462
463 if readable:
464 obj.read(1)
465 obj.read()
466 else:
467 self.assertRaises(OSError, obj.read, 1)
468 self.assertRaises(OSError, obj.read)
469
470 if writable:
471 obj.write(data)
472 else:
473 self.assertRaises(OSError, obj.write, data)
474
Martin Panter3ee147f2016-03-31 21:05:31 +0000475 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000476 pipe_reader, pipe_writer):
477 # Pipes seem to appear as seekable on Windows
478 continue
479 seekable = "s" in abilities
480 self.assertEqual(obj.seekable(), seekable)
481
Martin Panter754aab22016-03-31 07:21:56 +0000482 if seekable:
483 obj.tell()
484 obj.seek(0)
485 else:
486 self.assertRaises(OSError, obj.tell)
487 self.assertRaises(OSError, obj.seek, 0)
488
489 if writable and seekable:
490 obj.truncate()
491 obj.truncate(0)
492 else:
493 self.assertRaises(OSError, obj.truncate)
494 self.assertRaises(OSError, obj.truncate, 0)
495
Antoine Pitrou13348842012-01-29 18:36:34 +0100496 def test_open_handles_NUL_chars(self):
497 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300498 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
499 self.assertRaises(ValueError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100500
Guido van Rossum28524c72007-02-27 05:47:44 +0000501 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000502 with self.open(support.TESTFN, "wb", buffering=0) as f:
503 self.assertEqual(f.readable(), False)
504 self.assertEqual(f.writable(), True)
505 self.assertEqual(f.seekable(), True)
506 self.write_ops(f)
507 with self.open(support.TESTFN, "rb", buffering=0) as f:
508 self.assertEqual(f.readable(), True)
509 self.assertEqual(f.writable(), False)
510 self.assertEqual(f.seekable(), True)
511 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000512
Guido van Rossum87429772007-04-10 21:06:59 +0000513 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000514 with self.open(support.TESTFN, "wb") as f:
515 self.assertEqual(f.readable(), False)
516 self.assertEqual(f.writable(), True)
517 self.assertEqual(f.seekable(), True)
518 self.write_ops(f)
519 with self.open(support.TESTFN, "rb") as f:
520 self.assertEqual(f.readable(), True)
521 self.assertEqual(f.writable(), False)
522 self.assertEqual(f.seekable(), True)
523 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000524
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000525 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000526 with self.open(support.TESTFN, "wb") as f:
527 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
528 with self.open(support.TESTFN, "rb") as f:
529 self.assertEqual(f.readline(), b"abc\n")
530 self.assertEqual(f.readline(10), b"def\n")
531 self.assertEqual(f.readline(2), b"xy")
532 self.assertEqual(f.readline(4), b"zzy\n")
533 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000534 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000535 self.assertRaises(TypeError, f.readline, 5.3)
536 with self.open(support.TESTFN, "r") as f:
537 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000538
Guido van Rossum28524c72007-02-27 05:47:44 +0000539 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000540 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000541 self.write_ops(f)
542 data = f.getvalue()
543 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000544 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000545 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000546
Guido van Rossum53807da2007-04-10 19:01:47 +0000547 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000548 # On Windows and Mac OSX this test comsumes large resources; It takes
549 # a long time to build the >2GB file and takes >2GB of disk space
550 # therefore the resource must be enabled to run this test.
551 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600552 support.requires(
553 'largefile',
554 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000555 with self.open(support.TESTFN, "w+b", 0) as f:
556 self.large_file_ops(f)
557 with self.open(support.TESTFN, "w+b") as f:
558 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000559
560 def test_with_open(self):
561 for bufsize in (0, 1, 100):
562 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000563 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000564 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000565 self.assertEqual(f.closed, True)
566 f = None
567 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000568 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000569 1/0
570 except ZeroDivisionError:
571 self.assertEqual(f.closed, True)
572 else:
573 self.fail("1/0 didn't raise an exception")
574
Antoine Pitrou08838b62009-01-21 00:55:13 +0000575 # issue 5008
576 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000577 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000578 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000579 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000580 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000581 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000582 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000583 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300584 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000585
Guido van Rossum87429772007-04-10 21:06:59 +0000586 def test_destructor(self):
587 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000588 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000589 def __del__(self):
590 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000591 try:
592 f = super().__del__
593 except AttributeError:
594 pass
595 else:
596 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000597 def close(self):
598 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000599 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000600 def flush(self):
601 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000602 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000603 with support.check_warnings(('', ResourceWarning)):
604 f = MyFileIO(support.TESTFN, "wb")
605 f.write(b"xxx")
606 del f
607 support.gc_collect()
608 self.assertEqual(record, [1, 2, 3])
609 with self.open(support.TESTFN, "rb") as f:
610 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000611
612 def _check_base_destructor(self, base):
613 record = []
614 class MyIO(base):
615 def __init__(self):
616 # This exercises the availability of attributes on object
617 # destruction.
618 # (in the C version, close() is called by the tp_dealloc
619 # function, not by __del__)
620 self.on_del = 1
621 self.on_close = 2
622 self.on_flush = 3
623 def __del__(self):
624 record.append(self.on_del)
625 try:
626 f = super().__del__
627 except AttributeError:
628 pass
629 else:
630 f()
631 def close(self):
632 record.append(self.on_close)
633 super().close()
634 def flush(self):
635 record.append(self.on_flush)
636 super().flush()
637 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000638 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000639 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000640 self.assertEqual(record, [1, 2, 3])
641
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000642 def test_IOBase_destructor(self):
643 self._check_base_destructor(self.IOBase)
644
645 def test_RawIOBase_destructor(self):
646 self._check_base_destructor(self.RawIOBase)
647
648 def test_BufferedIOBase_destructor(self):
649 self._check_base_destructor(self.BufferedIOBase)
650
651 def test_TextIOBase_destructor(self):
652 self._check_base_destructor(self.TextIOBase)
653
Guido van Rossum87429772007-04-10 21:06:59 +0000654 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000655 with self.open(support.TESTFN, "wb") as f:
656 f.write(b"xxx")
657 with self.open(support.TESTFN, "rb") as f:
658 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000659
Guido van Rossumd4103952007-04-12 05:44:49 +0000660 def test_array_writes(self):
661 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000662 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000663 def check(f):
664 with f:
665 self.assertEqual(f.write(a), n)
666 f.writelines((a,))
667 check(self.BytesIO())
668 check(self.FileIO(support.TESTFN, "w"))
669 check(self.BufferedWriter(self.MockRawIO()))
670 check(self.BufferedRandom(self.MockRawIO()))
671 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000672
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000673 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000674 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000675 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000676
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000677 def test_read_closed(self):
678 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000679 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000680 with self.open(support.TESTFN, "r") as f:
681 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000682 self.assertEqual(file.read(), "egg\n")
683 file.seek(0)
684 file.close()
685 self.assertRaises(ValueError, file.read)
686
687 def test_no_closefd_with_filename(self):
688 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000689 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000690
691 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000692 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000693 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000694 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000695 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000696 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000697 self.assertEqual(file.buffer.raw.closefd, False)
698
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000699 def test_garbage_collection(self):
700 # FileIO objects are collected, and collecting them flushes
701 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000702 with support.check_warnings(('', ResourceWarning)):
703 f = self.FileIO(support.TESTFN, "wb")
704 f.write(b"abcxxx")
705 f.f = f
706 wr = weakref.ref(f)
707 del f
708 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300709 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000710 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000711 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000712
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000713 def test_unbounded_file(self):
714 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
715 zero = "/dev/zero"
716 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000717 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000718 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000719 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000720 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000721 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000722 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000723 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000724 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000725 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000726 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000727 self.assertRaises(OverflowError, f.read)
728
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200729 def check_flush_error_on_close(self, *args, **kwargs):
730 # Test that the file is closed despite failed flush
731 # and that flush() is called before file closed.
732 f = self.open(*args, **kwargs)
733 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000734 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200735 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200736 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000737 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200738 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600739 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200740 self.assertTrue(closed) # flush() called
741 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200742 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200743
744 def test_flush_error_on_close(self):
745 # raw file
746 # Issue #5700: io.FileIO calls flush() after file closed
747 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
748 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
749 self.check_flush_error_on_close(fd, 'wb', buffering=0)
750 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
751 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
752 os.close(fd)
753 # buffered io
754 self.check_flush_error_on_close(support.TESTFN, 'wb')
755 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
756 self.check_flush_error_on_close(fd, 'wb')
757 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
758 self.check_flush_error_on_close(fd, 'wb', closefd=False)
759 os.close(fd)
760 # text io
761 self.check_flush_error_on_close(support.TESTFN, 'w')
762 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
763 self.check_flush_error_on_close(fd, 'w')
764 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
765 self.check_flush_error_on_close(fd, 'w', closefd=False)
766 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000767
768 def test_multi_close(self):
769 f = self.open(support.TESTFN, "wb", buffering=0)
770 f.close()
771 f.close()
772 f.close()
773 self.assertRaises(ValueError, f.flush)
774
Antoine Pitrou328ec742010-09-14 18:37:24 +0000775 def test_RawIOBase_read(self):
776 # Exercise the default RawIOBase.read() implementation (which calls
777 # readinto() internally).
778 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
779 self.assertEqual(rawio.read(2), b"ab")
780 self.assertEqual(rawio.read(2), b"c")
781 self.assertEqual(rawio.read(2), b"d")
782 self.assertEqual(rawio.read(2), None)
783 self.assertEqual(rawio.read(2), b"ef")
784 self.assertEqual(rawio.read(2), b"g")
785 self.assertEqual(rawio.read(2), None)
786 self.assertEqual(rawio.read(2), b"")
787
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400788 def test_types_have_dict(self):
789 test = (
790 self.IOBase(),
791 self.RawIOBase(),
792 self.TextIOBase(),
793 self.StringIO(),
794 self.BytesIO()
795 )
796 for obj in test:
797 self.assertTrue(hasattr(obj, "__dict__"))
798
Ross Lagerwall59142db2011-10-31 20:34:46 +0200799 def test_opener(self):
800 with self.open(support.TESTFN, "w") as f:
801 f.write("egg\n")
802 fd = os.open(support.TESTFN, os.O_RDONLY)
803 def opener(path, flags):
804 return fd
805 with self.open("non-existent", "r", opener=opener) as f:
806 self.assertEqual(f.read(), "egg\n")
807
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200808 def test_fileio_closefd(self):
809 # Issue #4841
810 with self.open(__file__, 'rb') as f1, \
811 self.open(__file__, 'rb') as f2:
812 fileio = self.FileIO(f1.fileno(), closefd=False)
813 # .__init__() must not close f1
814 fileio.__init__(f2.fileno(), closefd=False)
815 f1.readline()
816 # .close() must not close f2
817 fileio.close()
818 f2.readline()
819
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300820 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200821 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300822 with self.assertRaises(ValueError):
823 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300824
825 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200826 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300827 with self.assertRaises(ValueError):
828 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300829
Martin Panter6bb91f32016-05-28 00:41:57 +0000830 def test_buffered_readinto_mixin(self):
831 # Test the implementation provided by BufferedIOBase
832 class Stream(self.BufferedIOBase):
833 def read(self, size):
834 return b"12345"
835 read1 = read
836 stream = Stream()
837 for method in ("readinto", "readinto1"):
838 with self.subTest(method):
839 buffer = byteslike(5)
840 self.assertEqual(getattr(stream, method)(buffer), 5)
841 self.assertEqual(bytes(buffer), b"12345")
842
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200843
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000844class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200845
846 def test_IOBase_finalize(self):
847 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
848 # class which inherits IOBase and an object of this class are caught
849 # in a reference cycle and close() is already in the method cache.
850 class MyIO(self.IOBase):
851 def close(self):
852 pass
853
854 # create an instance to populate the method cache
855 MyIO()
856 obj = MyIO()
857 obj.obj = obj
858 wr = weakref.ref(obj)
859 del MyIO
860 del obj
861 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300862 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000863
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000864class PyIOTest(IOTest):
865 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000866
Guido van Rossuma9e20242007-03-08 00:43:48 +0000867
Gregory P. Smith1bef9072015-04-14 13:24:34 -0700868@support.cpython_only
869class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -0700870
Gregory P. Smith054b0652015-04-14 12:58:05 -0700871 def test_RawIOBase_io_in_pyio_match(self):
872 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +0200873 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
874 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -0700875 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
876
877 def test_RawIOBase_pyio_in_io_match(self):
878 """Test that c RawIOBase class has all pyio RawIOBase methods"""
879 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
880 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
881
882
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000883class CommonBufferedTests:
884 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
885
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000886 def test_detach(self):
887 raw = self.MockRawIO()
888 buf = self.tp(raw)
889 self.assertIs(buf.detach(), raw)
890 self.assertRaises(ValueError, buf.detach)
891
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600892 repr(buf) # Should still work
893
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000894 def test_fileno(self):
895 rawio = self.MockRawIO()
896 bufio = self.tp(rawio)
897
Ezio Melottib3aedd42010-11-20 19:04:17 +0000898 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000899
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000900 def test_invalid_args(self):
901 rawio = self.MockRawIO()
902 bufio = self.tp(rawio)
903 # Invalid whence
904 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200905 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000906
907 def test_override_destructor(self):
908 tp = self.tp
909 record = []
910 class MyBufferedIO(tp):
911 def __del__(self):
912 record.append(1)
913 try:
914 f = super().__del__
915 except AttributeError:
916 pass
917 else:
918 f()
919 def close(self):
920 record.append(2)
921 super().close()
922 def flush(self):
923 record.append(3)
924 super().flush()
925 rawio = self.MockRawIO()
926 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000927 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000928 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +0000929 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000930
931 def test_context_manager(self):
932 # Test usability as a context manager
933 rawio = self.MockRawIO()
934 bufio = self.tp(rawio)
935 def _with():
936 with bufio:
937 pass
938 _with()
939 # bufio should now be closed, and using it a second time should raise
940 # a ValueError.
941 self.assertRaises(ValueError, _with)
942
943 def test_error_through_destructor(self):
944 # Test that the exception state is not modified by a destructor,
945 # even if close() fails.
946 rawio = self.CloseFailureIO()
947 def f():
948 self.tp(rawio).xyzzy
949 with support.captured_output("stderr") as s:
950 self.assertRaises(AttributeError, f)
951 s = s.getvalue().strip()
952 if s:
953 # The destructor *may* have printed an unraisable error, check it
954 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200955 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000956 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000957
Antoine Pitrou716c4442009-05-23 19:04:03 +0000958 def test_repr(self):
959 raw = self.MockRawIO()
960 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +0300961 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +0000962 self.assertEqual(repr(b), "<%s>" % clsname)
963 raw.name = "dummy"
964 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
965 raw.name = b"dummy"
966 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
967
Antoine Pitrou6be88762010-05-03 16:48:20 +0000968 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200969 # Test that buffered file is closed despite failed flush
970 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +0000971 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200972 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000973 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200974 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200975 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000976 raw.flush = bad_flush
977 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200978 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600979 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200980 self.assertTrue(raw.closed)
981 self.assertTrue(closed) # flush() called
982 self.assertFalse(closed[0]) # flush() called before file closed
983 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200984 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -0600985
986 def test_close_error_on_close(self):
987 raw = self.MockRawIO()
988 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200989 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600990 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200991 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600992 raw.close = bad_close
993 b = self.tp(raw)
994 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200995 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600996 b.close()
997 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300998 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -0600999 self.assertEqual(err.exception.__context__.args, ('flush',))
1000 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001001
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001002 def test_nonnormalized_close_error_on_close(self):
1003 # Issue #21677
1004 raw = self.MockRawIO()
1005 def bad_flush():
1006 raise non_existing_flush
1007 def bad_close():
1008 raise non_existing_close
1009 raw.close = bad_close
1010 b = self.tp(raw)
1011 b.flush = bad_flush
1012 with self.assertRaises(NameError) as err: # exception not swallowed
1013 b.close()
1014 self.assertIn('non_existing_close', str(err.exception))
1015 self.assertIsInstance(err.exception.__context__, NameError)
1016 self.assertIn('non_existing_flush', str(err.exception.__context__))
1017 self.assertFalse(b.closed)
1018
Antoine Pitrou6be88762010-05-03 16:48:20 +00001019 def test_multi_close(self):
1020 raw = self.MockRawIO()
1021 b = self.tp(raw)
1022 b.close()
1023 b.close()
1024 b.close()
1025 self.assertRaises(ValueError, b.flush)
1026
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001027 def test_unseekable(self):
1028 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1029 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1030 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1031
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001032 def test_readonly_attributes(self):
1033 raw = self.MockRawIO()
1034 buf = self.tp(raw)
1035 x = self.MockRawIO()
1036 with self.assertRaises(AttributeError):
1037 buf.raw = x
1038
Guido van Rossum78892e42007-04-06 17:31:18 +00001039
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001040class SizeofTest:
1041
1042 @support.cpython_only
1043 def test_sizeof(self):
1044 bufsize1 = 4096
1045 bufsize2 = 8192
1046 rawio = self.MockRawIO()
1047 bufio = self.tp(rawio, buffer_size=bufsize1)
1048 size = sys.getsizeof(bufio) - bufsize1
1049 rawio = self.MockRawIO()
1050 bufio = self.tp(rawio, buffer_size=bufsize2)
1051 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1052
Jesus Ceadc469452012-10-04 12:37:56 +02001053 @support.cpython_only
1054 def test_buffer_freeing(self) :
1055 bufsize = 4096
1056 rawio = self.MockRawIO()
1057 bufio = self.tp(rawio, buffer_size=bufsize)
1058 size = sys.getsizeof(bufio) - bufsize
1059 bufio.close()
1060 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001061
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001062class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1063 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001064
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001065 def test_constructor(self):
1066 rawio = self.MockRawIO([b"abc"])
1067 bufio = self.tp(rawio)
1068 bufio.__init__(rawio)
1069 bufio.__init__(rawio, buffer_size=1024)
1070 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001071 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001072 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1073 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1074 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1075 rawio = self.MockRawIO([b"abc"])
1076 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001077 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001078
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001079 def test_uninitialized(self):
1080 bufio = self.tp.__new__(self.tp)
1081 del bufio
1082 bufio = self.tp.__new__(self.tp)
1083 self.assertRaisesRegex((ValueError, AttributeError),
1084 'uninitialized|has no attribute',
1085 bufio.read, 0)
1086 bufio.__init__(self.MockRawIO())
1087 self.assertEqual(bufio.read(0), b'')
1088
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001089 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001090 for arg in (None, 7):
1091 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1092 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001093 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001094 # Invalid args
1095 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001096
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001097 def test_read1(self):
1098 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1099 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001100 self.assertEqual(b"a", bufio.read(1))
1101 self.assertEqual(b"b", bufio.read1(1))
1102 self.assertEqual(rawio._reads, 1)
1103 self.assertEqual(b"c", bufio.read1(100))
1104 self.assertEqual(rawio._reads, 1)
1105 self.assertEqual(b"d", bufio.read1(100))
1106 self.assertEqual(rawio._reads, 2)
1107 self.assertEqual(b"efg", bufio.read1(100))
1108 self.assertEqual(rawio._reads, 3)
1109 self.assertEqual(b"", bufio.read1(100))
1110 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001111 # Invalid args
1112 self.assertRaises(ValueError, bufio.read1, -1)
1113
1114 def test_readinto(self):
1115 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1116 bufio = self.tp(rawio)
1117 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001118 self.assertEqual(bufio.readinto(b), 2)
1119 self.assertEqual(b, b"ab")
1120 self.assertEqual(bufio.readinto(b), 2)
1121 self.assertEqual(b, b"cd")
1122 self.assertEqual(bufio.readinto(b), 2)
1123 self.assertEqual(b, b"ef")
1124 self.assertEqual(bufio.readinto(b), 1)
1125 self.assertEqual(b, b"gf")
1126 self.assertEqual(bufio.readinto(b), 0)
1127 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001128 rawio = self.MockRawIO((b"abc", None))
1129 bufio = self.tp(rawio)
1130 self.assertEqual(bufio.readinto(b), 2)
1131 self.assertEqual(b, b"ab")
1132 self.assertEqual(bufio.readinto(b), 1)
1133 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001134
Benjamin Petersona96fea02014-06-22 14:17:44 -07001135 def test_readinto1(self):
1136 buffer_size = 10
1137 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1138 bufio = self.tp(rawio, buffer_size=buffer_size)
1139 b = bytearray(2)
1140 self.assertEqual(bufio.peek(3), b'abc')
1141 self.assertEqual(rawio._reads, 1)
1142 self.assertEqual(bufio.readinto1(b), 2)
1143 self.assertEqual(b, b"ab")
1144 self.assertEqual(rawio._reads, 1)
1145 self.assertEqual(bufio.readinto1(b), 1)
1146 self.assertEqual(b[:1], b"c")
1147 self.assertEqual(rawio._reads, 1)
1148 self.assertEqual(bufio.readinto1(b), 2)
1149 self.assertEqual(b, b"de")
1150 self.assertEqual(rawio._reads, 2)
1151 b = bytearray(2*buffer_size)
1152 self.assertEqual(bufio.peek(3), b'fgh')
1153 self.assertEqual(rawio._reads, 3)
1154 self.assertEqual(bufio.readinto1(b), 6)
1155 self.assertEqual(b[:6], b"fghjkl")
1156 self.assertEqual(rawio._reads, 4)
1157
1158 def test_readinto_array(self):
1159 buffer_size = 60
1160 data = b"a" * 26
1161 rawio = self.MockRawIO((data,))
1162 bufio = self.tp(rawio, buffer_size=buffer_size)
1163
1164 # Create an array with element size > 1 byte
1165 b = array.array('i', b'x' * 32)
1166 assert len(b) != 16
1167
1168 # Read into it. We should get as many *bytes* as we can fit into b
1169 # (which is more than the number of elements)
1170 n = bufio.readinto(b)
1171 self.assertGreater(n, len(b))
1172
1173 # Check that old contents of b are preserved
1174 bm = memoryview(b).cast('B')
1175 self.assertLess(n, len(bm))
1176 self.assertEqual(bm[:n], data[:n])
1177 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1178
1179 def test_readinto1_array(self):
1180 buffer_size = 60
1181 data = b"a" * 26
1182 rawio = self.MockRawIO((data,))
1183 bufio = self.tp(rawio, buffer_size=buffer_size)
1184
1185 # Create an array with element size > 1 byte
1186 b = array.array('i', b'x' * 32)
1187 assert len(b) != 16
1188
1189 # Read into it. We should get as many *bytes* as we can fit into b
1190 # (which is more than the number of elements)
1191 n = bufio.readinto1(b)
1192 self.assertGreater(n, len(b))
1193
1194 # Check that old contents of b are preserved
1195 bm = memoryview(b).cast('B')
1196 self.assertLess(n, len(bm))
1197 self.assertEqual(bm[:n], data[:n])
1198 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1199
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001200 def test_readlines(self):
1201 def bufio():
1202 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1203 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001204 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1205 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1206 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001207
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001208 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001209 data = b"abcdefghi"
1210 dlen = len(data)
1211
1212 tests = [
1213 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1214 [ 100, [ 3, 3, 3], [ dlen ] ],
1215 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1216 ]
1217
1218 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001219 rawio = self.MockFileIO(data)
1220 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001221 pos = 0
1222 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001223 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001224 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001225 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001226 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001227
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001228 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001229 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001230 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1231 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001232 self.assertEqual(b"abcd", bufio.read(6))
1233 self.assertEqual(b"e", bufio.read(1))
1234 self.assertEqual(b"fg", bufio.read())
1235 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001236 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001237 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001238
Victor Stinnera80987f2011-05-25 22:47:16 +02001239 rawio = self.MockRawIO((b"a", None, None))
1240 self.assertEqual(b"a", rawio.readall())
1241 self.assertIsNone(rawio.readall())
1242
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001243 def test_read_past_eof(self):
1244 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1245 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001246
Ezio Melottib3aedd42010-11-20 19:04:17 +00001247 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001248
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001249 def test_read_all(self):
1250 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1251 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001252
Ezio Melottib3aedd42010-11-20 19:04:17 +00001253 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001254
Victor Stinner45df8202010-04-28 22:31:17 +00001255 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001256 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001257 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001258 try:
1259 # Write out many bytes with exactly the same number of 0's,
1260 # 1's... 255's. This will help us check that concurrent reading
1261 # doesn't duplicate or forget contents.
1262 N = 1000
1263 l = list(range(256)) * N
1264 random.shuffle(l)
1265 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001266 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001267 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001268 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001269 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001270 errors = []
1271 results = []
1272 def f():
1273 try:
1274 # Intra-buffer read then buffer-flushing read
1275 for n in cycle([1, 19]):
1276 s = bufio.read(n)
1277 if not s:
1278 break
1279 # list.append() is atomic
1280 results.append(s)
1281 except Exception as e:
1282 errors.append(e)
1283 raise
1284 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001285 with support.start_threads(threads):
1286 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001287 self.assertFalse(errors,
1288 "the following exceptions were caught: %r" % errors)
1289 s = b''.join(results)
1290 for i in range(256):
1291 c = bytes(bytearray([i]))
1292 self.assertEqual(s.count(c), N)
1293 finally:
1294 support.unlink(support.TESTFN)
1295
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001296 def test_unseekable(self):
1297 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1298 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1299 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1300 bufio.read(1)
1301 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1302 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1303
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001304 def test_misbehaved_io(self):
1305 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1306 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001307 self.assertRaises(OSError, bufio.seek, 0)
1308 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001309
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001310 def test_no_extraneous_read(self):
1311 # Issue #9550; when the raw IO object has satisfied the read request,
1312 # we should not issue any additional reads, otherwise it may block
1313 # (e.g. socket).
1314 bufsize = 16
1315 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1316 rawio = self.MockRawIO([b"x" * n])
1317 bufio = self.tp(rawio, bufsize)
1318 self.assertEqual(bufio.read(n), b"x" * n)
1319 # Simple case: one raw read is enough to satisfy the request.
1320 self.assertEqual(rawio._extraneous_reads, 0,
1321 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1322 # A more complex case where two raw reads are needed to satisfy
1323 # the request.
1324 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1325 bufio = self.tp(rawio, bufsize)
1326 self.assertEqual(bufio.read(n), b"x" * n)
1327 self.assertEqual(rawio._extraneous_reads, 0,
1328 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1329
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001330 def test_read_on_closed(self):
1331 # Issue #23796
1332 b = io.BufferedReader(io.BytesIO(b"12"))
1333 b.read(1)
1334 b.close()
1335 self.assertRaises(ValueError, b.peek)
1336 self.assertRaises(ValueError, b.read1, 1)
1337
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001338
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001339class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001340 tp = io.BufferedReader
1341
1342 def test_constructor(self):
1343 BufferedReaderTest.test_constructor(self)
1344 # The allocation can succeed on 32-bit builds, e.g. with more
1345 # than 2GB RAM and a 64-bit kernel.
1346 if sys.maxsize > 0x7FFFFFFF:
1347 rawio = self.MockRawIO()
1348 bufio = self.tp(rawio)
1349 self.assertRaises((OverflowError, MemoryError, ValueError),
1350 bufio.__init__, rawio, sys.maxsize)
1351
1352 def test_initialization(self):
1353 rawio = self.MockRawIO([b"abc"])
1354 bufio = self.tp(rawio)
1355 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1356 self.assertRaises(ValueError, bufio.read)
1357 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1358 self.assertRaises(ValueError, bufio.read)
1359 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1360 self.assertRaises(ValueError, bufio.read)
1361
1362 def test_misbehaved_io_read(self):
1363 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1364 bufio = self.tp(rawio)
1365 # _pyio.BufferedReader seems to implement reading different, so that
1366 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001367 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001368
1369 def test_garbage_collection(self):
1370 # C BufferedReader objects are collected.
1371 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001372 with support.check_warnings(('', ResourceWarning)):
1373 rawio = self.FileIO(support.TESTFN, "w+b")
1374 f = self.tp(rawio)
1375 f.f = f
1376 wr = weakref.ref(f)
1377 del f
1378 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001379 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001380
R David Murray67bfe802013-02-23 21:51:05 -05001381 def test_args_error(self):
1382 # Issue #17275
1383 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1384 self.tp(io.BytesIO(), 1024, 1024, 1024)
1385
1386
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001387class PyBufferedReaderTest(BufferedReaderTest):
1388 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001389
Guido van Rossuma9e20242007-03-08 00:43:48 +00001390
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001391class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1392 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001393
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001394 def test_constructor(self):
1395 rawio = self.MockRawIO()
1396 bufio = self.tp(rawio)
1397 bufio.__init__(rawio)
1398 bufio.__init__(rawio, buffer_size=1024)
1399 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001400 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001401 bufio.flush()
1402 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1403 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1404 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1405 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001406 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001407 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001408 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001409
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001410 def test_uninitialized(self):
1411 bufio = self.tp.__new__(self.tp)
1412 del bufio
1413 bufio = self.tp.__new__(self.tp)
1414 self.assertRaisesRegex((ValueError, AttributeError),
1415 'uninitialized|has no attribute',
1416 bufio.write, b'')
1417 bufio.__init__(self.MockRawIO())
1418 self.assertEqual(bufio.write(b''), 0)
1419
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001420 def test_detach_flush(self):
1421 raw = self.MockRawIO()
1422 buf = self.tp(raw)
1423 buf.write(b"howdy!")
1424 self.assertFalse(raw._write_stack)
1425 buf.detach()
1426 self.assertEqual(raw._write_stack, [b"howdy!"])
1427
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001428 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001429 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001430 writer = self.MockRawIO()
1431 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001432 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001433 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001434 buffer = bytearray(b"def")
1435 bufio.write(buffer)
1436 buffer[:] = b"***" # Overwrite our copy of the data
1437 bufio.flush()
1438 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001439
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001440 def test_write_overflow(self):
1441 writer = self.MockRawIO()
1442 bufio = self.tp(writer, 8)
1443 contents = b"abcdefghijklmnop"
1444 for n in range(0, len(contents), 3):
1445 bufio.write(contents[n:n+3])
1446 flushed = b"".join(writer._write_stack)
1447 # At least (total - 8) bytes were implicitly flushed, perhaps more
1448 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001449 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001450
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001451 def check_writes(self, intermediate_func):
1452 # Lots of writes, test the flushed output is as expected.
1453 contents = bytes(range(256)) * 1000
1454 n = 0
1455 writer = self.MockRawIO()
1456 bufio = self.tp(writer, 13)
1457 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1458 def gen_sizes():
1459 for size in count(1):
1460 for i in range(15):
1461 yield size
1462 sizes = gen_sizes()
1463 while n < len(contents):
1464 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001465 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001466 intermediate_func(bufio)
1467 n += size
1468 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001469 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001470
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001471 def test_writes(self):
1472 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001473
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001474 def test_writes_and_flushes(self):
1475 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001476
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001477 def test_writes_and_seeks(self):
1478 def _seekabs(bufio):
1479 pos = bufio.tell()
1480 bufio.seek(pos + 1, 0)
1481 bufio.seek(pos - 1, 0)
1482 bufio.seek(pos, 0)
1483 self.check_writes(_seekabs)
1484 def _seekrel(bufio):
1485 pos = bufio.seek(0, 1)
1486 bufio.seek(+1, 1)
1487 bufio.seek(-1, 1)
1488 bufio.seek(pos, 0)
1489 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001490
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001491 def test_writes_and_truncates(self):
1492 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001493
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001494 def test_write_non_blocking(self):
1495 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001496 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001497
Ezio Melottib3aedd42010-11-20 19:04:17 +00001498 self.assertEqual(bufio.write(b"abcd"), 4)
1499 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001500 # 1 byte will be written, the rest will be buffered
1501 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001502 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001503
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001504 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1505 raw.block_on(b"0")
1506 try:
1507 bufio.write(b"opqrwxyz0123456789")
1508 except self.BlockingIOError as e:
1509 written = e.characters_written
1510 else:
1511 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001512 self.assertEqual(written, 16)
1513 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001514 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001515
Ezio Melottib3aedd42010-11-20 19:04:17 +00001516 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001517 s = raw.pop_written()
1518 # Previously buffered bytes were flushed
1519 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001520
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001521 def test_write_and_rewind(self):
1522 raw = io.BytesIO()
1523 bufio = self.tp(raw, 4)
1524 self.assertEqual(bufio.write(b"abcdef"), 6)
1525 self.assertEqual(bufio.tell(), 6)
1526 bufio.seek(0, 0)
1527 self.assertEqual(bufio.write(b"XY"), 2)
1528 bufio.seek(6, 0)
1529 self.assertEqual(raw.getvalue(), b"XYcdef")
1530 self.assertEqual(bufio.write(b"123456"), 6)
1531 bufio.flush()
1532 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001533
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001534 def test_flush(self):
1535 writer = self.MockRawIO()
1536 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001537 bufio.write(b"abc")
1538 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001539 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001540
Antoine Pitrou131a4892012-10-16 22:57:11 +02001541 def test_writelines(self):
1542 l = [b'ab', b'cd', b'ef']
1543 writer = self.MockRawIO()
1544 bufio = self.tp(writer, 8)
1545 bufio.writelines(l)
1546 bufio.flush()
1547 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1548
1549 def test_writelines_userlist(self):
1550 l = UserList([b'ab', b'cd', b'ef'])
1551 writer = self.MockRawIO()
1552 bufio = self.tp(writer, 8)
1553 bufio.writelines(l)
1554 bufio.flush()
1555 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1556
1557 def test_writelines_error(self):
1558 writer = self.MockRawIO()
1559 bufio = self.tp(writer, 8)
1560 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1561 self.assertRaises(TypeError, bufio.writelines, None)
1562 self.assertRaises(TypeError, bufio.writelines, 'abc')
1563
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001564 def test_destructor(self):
1565 writer = self.MockRawIO()
1566 bufio = self.tp(writer, 8)
1567 bufio.write(b"abc")
1568 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001569 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001570 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001571
1572 def test_truncate(self):
1573 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001574 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001575 bufio = self.tp(raw, 8)
1576 bufio.write(b"abcdef")
1577 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001578 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001579 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001580 self.assertEqual(f.read(), b"abc")
1581
Victor Stinner45df8202010-04-28 22:31:17 +00001582 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001583 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001584 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001585 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001586 # Write out many bytes from many threads and test they were
1587 # all flushed.
1588 N = 1000
1589 contents = bytes(range(256)) * N
1590 sizes = cycle([1, 19])
1591 n = 0
1592 queue = deque()
1593 while n < len(contents):
1594 size = next(sizes)
1595 queue.append(contents[n:n+size])
1596 n += size
1597 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001598 # We use a real file object because it allows us to
1599 # exercise situations where the GIL is released before
1600 # writing the buffer to the raw streams. This is in addition
1601 # to concurrency issues due to switching threads in the middle
1602 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001603 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001604 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001605 errors = []
1606 def f():
1607 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001608 while True:
1609 try:
1610 s = queue.popleft()
1611 except IndexError:
1612 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001613 bufio.write(s)
1614 except Exception as e:
1615 errors.append(e)
1616 raise
1617 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001618 with support.start_threads(threads):
1619 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001620 self.assertFalse(errors,
1621 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001622 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001623 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001624 s = f.read()
1625 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001626 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001627 finally:
1628 support.unlink(support.TESTFN)
1629
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001630 def test_misbehaved_io(self):
1631 rawio = self.MisbehavedRawIO()
1632 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001633 self.assertRaises(OSError, bufio.seek, 0)
1634 self.assertRaises(OSError, bufio.tell)
1635 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001636
Florent Xicluna109d5732012-07-07 17:03:22 +02001637 def test_max_buffer_size_removal(self):
1638 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001639 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001640
Benjamin Peterson68623612012-12-20 11:53:11 -06001641 def test_write_error_on_close(self):
1642 raw = self.MockRawIO()
1643 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001644 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001645 raw.write = bad_write
1646 b = self.tp(raw)
1647 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001648 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001649 self.assertTrue(b.closed)
1650
Benjamin Peterson59406a92009-03-26 17:10:29 +00001651
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001652class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001653 tp = io.BufferedWriter
1654
1655 def test_constructor(self):
1656 BufferedWriterTest.test_constructor(self)
1657 # The allocation can succeed on 32-bit builds, e.g. with more
1658 # than 2GB RAM and a 64-bit kernel.
1659 if sys.maxsize > 0x7FFFFFFF:
1660 rawio = self.MockRawIO()
1661 bufio = self.tp(rawio)
1662 self.assertRaises((OverflowError, MemoryError, ValueError),
1663 bufio.__init__, rawio, sys.maxsize)
1664
1665 def test_initialization(self):
1666 rawio = self.MockRawIO()
1667 bufio = self.tp(rawio)
1668 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1669 self.assertRaises(ValueError, bufio.write, b"def")
1670 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1671 self.assertRaises(ValueError, bufio.write, b"def")
1672 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1673 self.assertRaises(ValueError, bufio.write, b"def")
1674
1675 def test_garbage_collection(self):
1676 # C BufferedWriter objects are collected, and collecting them flushes
1677 # all data to disk.
1678 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001679 with support.check_warnings(('', ResourceWarning)):
1680 rawio = self.FileIO(support.TESTFN, "w+b")
1681 f = self.tp(rawio)
1682 f.write(b"123xxx")
1683 f.x = f
1684 wr = weakref.ref(f)
1685 del f
1686 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001687 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001688 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001689 self.assertEqual(f.read(), b"123xxx")
1690
R David Murray67bfe802013-02-23 21:51:05 -05001691 def test_args_error(self):
1692 # Issue #17275
1693 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1694 self.tp(io.BytesIO(), 1024, 1024, 1024)
1695
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001696
1697class PyBufferedWriterTest(BufferedWriterTest):
1698 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001699
Guido van Rossum01a27522007-03-07 01:00:12 +00001700class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001701
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001702 def test_constructor(self):
1703 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001704 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001705
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001706 def test_uninitialized(self):
1707 pair = self.tp.__new__(self.tp)
1708 del pair
1709 pair = self.tp.__new__(self.tp)
1710 self.assertRaisesRegex((ValueError, AttributeError),
1711 'uninitialized|has no attribute',
1712 pair.read, 0)
1713 self.assertRaisesRegex((ValueError, AttributeError),
1714 'uninitialized|has no attribute',
1715 pair.write, b'')
1716 pair.__init__(self.MockRawIO(), self.MockRawIO())
1717 self.assertEqual(pair.read(0), b'')
1718 self.assertEqual(pair.write(b''), 0)
1719
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001720 def test_detach(self):
1721 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1722 self.assertRaises(self.UnsupportedOperation, pair.detach)
1723
Florent Xicluna109d5732012-07-07 17:03:22 +02001724 def test_constructor_max_buffer_size_removal(self):
1725 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001726 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001727
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001728 def test_constructor_with_not_readable(self):
1729 class NotReadable(MockRawIO):
1730 def readable(self):
1731 return False
1732
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001733 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001734
1735 def test_constructor_with_not_writeable(self):
1736 class NotWriteable(MockRawIO):
1737 def writable(self):
1738 return False
1739
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001740 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001741
1742 def test_read(self):
1743 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1744
1745 self.assertEqual(pair.read(3), b"abc")
1746 self.assertEqual(pair.read(1), b"d")
1747 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001748 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1749 self.assertEqual(pair.read(None), b"abc")
1750
1751 def test_readlines(self):
1752 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1753 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1754 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1755 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001756
1757 def test_read1(self):
1758 # .read1() is delegated to the underlying reader object, so this test
1759 # can be shallow.
1760 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1761
1762 self.assertEqual(pair.read1(3), b"abc")
1763
1764 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001765 for method in ("readinto", "readinto1"):
1766 with self.subTest(method):
1767 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001768
Martin Panter6bb91f32016-05-28 00:41:57 +00001769 data = byteslike(5)
1770 self.assertEqual(getattr(pair, method)(data), 5)
1771 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001772
1773 def test_write(self):
1774 w = self.MockRawIO()
1775 pair = self.tp(self.MockRawIO(), w)
1776
1777 pair.write(b"abc")
1778 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00001779 buffer = bytearray(b"def")
1780 pair.write(buffer)
1781 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001782 pair.flush()
1783 self.assertEqual(w._write_stack, [b"abc", b"def"])
1784
1785 def test_peek(self):
1786 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1787
1788 self.assertTrue(pair.peek(3).startswith(b"abc"))
1789 self.assertEqual(pair.read(3), b"abc")
1790
1791 def test_readable(self):
1792 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1793 self.assertTrue(pair.readable())
1794
1795 def test_writeable(self):
1796 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1797 self.assertTrue(pair.writable())
1798
1799 def test_seekable(self):
1800 # BufferedRWPairs are never seekable, even if their readers and writers
1801 # are.
1802 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1803 self.assertFalse(pair.seekable())
1804
1805 # .flush() is delegated to the underlying writer object and has been
1806 # tested in the test_write method.
1807
1808 def test_close_and_closed(self):
1809 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1810 self.assertFalse(pair.closed)
1811 pair.close()
1812 self.assertTrue(pair.closed)
1813
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001814 def test_reader_close_error_on_close(self):
1815 def reader_close():
1816 reader_non_existing
1817 reader = self.MockRawIO()
1818 reader.close = reader_close
1819 writer = self.MockRawIO()
1820 pair = self.tp(reader, writer)
1821 with self.assertRaises(NameError) as err:
1822 pair.close()
1823 self.assertIn('reader_non_existing', str(err.exception))
1824 self.assertTrue(pair.closed)
1825 self.assertFalse(reader.closed)
1826 self.assertTrue(writer.closed)
1827
1828 def test_writer_close_error_on_close(self):
1829 def writer_close():
1830 writer_non_existing
1831 reader = self.MockRawIO()
1832 writer = self.MockRawIO()
1833 writer.close = writer_close
1834 pair = self.tp(reader, writer)
1835 with self.assertRaises(NameError) as err:
1836 pair.close()
1837 self.assertIn('writer_non_existing', str(err.exception))
1838 self.assertFalse(pair.closed)
1839 self.assertTrue(reader.closed)
1840 self.assertFalse(writer.closed)
1841
1842 def test_reader_writer_close_error_on_close(self):
1843 def reader_close():
1844 reader_non_existing
1845 def writer_close():
1846 writer_non_existing
1847 reader = self.MockRawIO()
1848 reader.close = reader_close
1849 writer = self.MockRawIO()
1850 writer.close = writer_close
1851 pair = self.tp(reader, writer)
1852 with self.assertRaises(NameError) as err:
1853 pair.close()
1854 self.assertIn('reader_non_existing', str(err.exception))
1855 self.assertIsInstance(err.exception.__context__, NameError)
1856 self.assertIn('writer_non_existing', str(err.exception.__context__))
1857 self.assertFalse(pair.closed)
1858 self.assertFalse(reader.closed)
1859 self.assertFalse(writer.closed)
1860
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001861 def test_isatty(self):
1862 class SelectableIsAtty(MockRawIO):
1863 def __init__(self, isatty):
1864 MockRawIO.__init__(self)
1865 self._isatty = isatty
1866
1867 def isatty(self):
1868 return self._isatty
1869
1870 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1871 self.assertFalse(pair.isatty())
1872
1873 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1874 self.assertTrue(pair.isatty())
1875
1876 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1877 self.assertTrue(pair.isatty())
1878
1879 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1880 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001881
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001882 def test_weakref_clearing(self):
1883 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1884 ref = weakref.ref(brw)
1885 brw = None
1886 ref = None # Shouldn't segfault.
1887
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001888class CBufferedRWPairTest(BufferedRWPairTest):
1889 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001890
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001891class PyBufferedRWPairTest(BufferedRWPairTest):
1892 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001893
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001894
1895class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1896 read_mode = "rb+"
1897 write_mode = "wb+"
1898
1899 def test_constructor(self):
1900 BufferedReaderTest.test_constructor(self)
1901 BufferedWriterTest.test_constructor(self)
1902
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001903 def test_uninitialized(self):
1904 BufferedReaderTest.test_uninitialized(self)
1905 BufferedWriterTest.test_uninitialized(self)
1906
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001907 def test_read_and_write(self):
1908 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001909 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001910
1911 self.assertEqual(b"as", rw.read(2))
1912 rw.write(b"ddd")
1913 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001914 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001915 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001916 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001917
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001918 def test_seek_and_tell(self):
1919 raw = self.BytesIO(b"asdfghjkl")
1920 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001921
Ezio Melottib3aedd42010-11-20 19:04:17 +00001922 self.assertEqual(b"as", rw.read(2))
1923 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001924 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001925 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001926
Antoine Pitroue05565e2011-08-20 14:39:23 +02001927 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001928 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001929 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001930 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001931 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001932 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001933 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001934 self.assertEqual(7, rw.tell())
1935 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001936 rw.flush()
1937 self.assertEqual(b"asdf123fl", raw.getvalue())
1938
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001939 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001940
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001941 def check_flush_and_read(self, read_func):
1942 raw = self.BytesIO(b"abcdefghi")
1943 bufio = self.tp(raw)
1944
Ezio Melottib3aedd42010-11-20 19:04:17 +00001945 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001946 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001947 self.assertEqual(b"ef", read_func(bufio, 2))
1948 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001949 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001950 self.assertEqual(6, bufio.tell())
1951 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001952 raw.seek(0, 0)
1953 raw.write(b"XYZ")
1954 # flush() resets the read buffer
1955 bufio.flush()
1956 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001957 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001958
1959 def test_flush_and_read(self):
1960 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1961
1962 def test_flush_and_readinto(self):
1963 def _readinto(bufio, n=-1):
1964 b = bytearray(n if n >= 0 else 9999)
1965 n = bufio.readinto(b)
1966 return bytes(b[:n])
1967 self.check_flush_and_read(_readinto)
1968
1969 def test_flush_and_peek(self):
1970 def _peek(bufio, n=-1):
1971 # This relies on the fact that the buffer can contain the whole
1972 # raw stream, otherwise peek() can return less.
1973 b = bufio.peek(n)
1974 if n != -1:
1975 b = b[:n]
1976 bufio.seek(len(b), 1)
1977 return b
1978 self.check_flush_and_read(_peek)
1979
1980 def test_flush_and_write(self):
1981 raw = self.BytesIO(b"abcdefghi")
1982 bufio = self.tp(raw)
1983
1984 bufio.write(b"123")
1985 bufio.flush()
1986 bufio.write(b"45")
1987 bufio.flush()
1988 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001989 self.assertEqual(b"12345fghi", raw.getvalue())
1990 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001991
1992 def test_threads(self):
1993 BufferedReaderTest.test_threads(self)
1994 BufferedWriterTest.test_threads(self)
1995
1996 def test_writes_and_peek(self):
1997 def _peek(bufio):
1998 bufio.peek(1)
1999 self.check_writes(_peek)
2000 def _peek(bufio):
2001 pos = bufio.tell()
2002 bufio.seek(-1, 1)
2003 bufio.peek(1)
2004 bufio.seek(pos, 0)
2005 self.check_writes(_peek)
2006
2007 def test_writes_and_reads(self):
2008 def _read(bufio):
2009 bufio.seek(-1, 1)
2010 bufio.read(1)
2011 self.check_writes(_read)
2012
2013 def test_writes_and_read1s(self):
2014 def _read1(bufio):
2015 bufio.seek(-1, 1)
2016 bufio.read1(1)
2017 self.check_writes(_read1)
2018
2019 def test_writes_and_readintos(self):
2020 def _read(bufio):
2021 bufio.seek(-1, 1)
2022 bufio.readinto(bytearray(1))
2023 self.check_writes(_read)
2024
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002025 def test_write_after_readahead(self):
2026 # Issue #6629: writing after the buffer was filled by readahead should
2027 # first rewind the raw stream.
2028 for overwrite_size in [1, 5]:
2029 raw = self.BytesIO(b"A" * 10)
2030 bufio = self.tp(raw, 4)
2031 # Trigger readahead
2032 self.assertEqual(bufio.read(1), b"A")
2033 self.assertEqual(bufio.tell(), 1)
2034 # Overwriting should rewind the raw stream if it needs so
2035 bufio.write(b"B" * overwrite_size)
2036 self.assertEqual(bufio.tell(), overwrite_size + 1)
2037 # If the write size was smaller than the buffer size, flush() and
2038 # check that rewind happens.
2039 bufio.flush()
2040 self.assertEqual(bufio.tell(), overwrite_size + 1)
2041 s = raw.getvalue()
2042 self.assertEqual(s,
2043 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2044
Antoine Pitrou7c404892011-05-13 00:13:33 +02002045 def test_write_rewind_write(self):
2046 # Various combinations of reading / writing / seeking backwards / writing again
2047 def mutate(bufio, pos1, pos2):
2048 assert pos2 >= pos1
2049 # Fill the buffer
2050 bufio.seek(pos1)
2051 bufio.read(pos2 - pos1)
2052 bufio.write(b'\x02')
2053 # This writes earlier than the previous write, but still inside
2054 # the buffer.
2055 bufio.seek(pos1)
2056 bufio.write(b'\x01')
2057
2058 b = b"\x80\x81\x82\x83\x84"
2059 for i in range(0, len(b)):
2060 for j in range(i, len(b)):
2061 raw = self.BytesIO(b)
2062 bufio = self.tp(raw, 100)
2063 mutate(bufio, i, j)
2064 bufio.flush()
2065 expected = bytearray(b)
2066 expected[j] = 2
2067 expected[i] = 1
2068 self.assertEqual(raw.getvalue(), expected,
2069 "failed result for i=%d, j=%d" % (i, j))
2070
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002071 def test_truncate_after_read_or_write(self):
2072 raw = self.BytesIO(b"A" * 10)
2073 bufio = self.tp(raw, 100)
2074 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2075 self.assertEqual(bufio.truncate(), 2)
2076 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2077 self.assertEqual(bufio.truncate(), 4)
2078
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002079 def test_misbehaved_io(self):
2080 BufferedReaderTest.test_misbehaved_io(self)
2081 BufferedWriterTest.test_misbehaved_io(self)
2082
Antoine Pitroue05565e2011-08-20 14:39:23 +02002083 def test_interleaved_read_write(self):
2084 # Test for issue #12213
2085 with self.BytesIO(b'abcdefgh') as raw:
2086 with self.tp(raw, 100) as f:
2087 f.write(b"1")
2088 self.assertEqual(f.read(1), b'b')
2089 f.write(b'2')
2090 self.assertEqual(f.read1(1), b'd')
2091 f.write(b'3')
2092 buf = bytearray(1)
2093 f.readinto(buf)
2094 self.assertEqual(buf, b'f')
2095 f.write(b'4')
2096 self.assertEqual(f.peek(1), b'h')
2097 f.flush()
2098 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2099
2100 with self.BytesIO(b'abc') as raw:
2101 with self.tp(raw, 100) as f:
2102 self.assertEqual(f.read(1), b'a')
2103 f.write(b"2")
2104 self.assertEqual(f.read(1), b'c')
2105 f.flush()
2106 self.assertEqual(raw.getvalue(), b'a2c')
2107
2108 def test_interleaved_readline_write(self):
2109 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2110 with self.tp(raw) as f:
2111 f.write(b'1')
2112 self.assertEqual(f.readline(), b'b\n')
2113 f.write(b'2')
2114 self.assertEqual(f.readline(), b'def\n')
2115 f.write(b'3')
2116 self.assertEqual(f.readline(), b'\n')
2117 f.flush()
2118 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2119
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002120 # You can't construct a BufferedRandom over a non-seekable stream.
2121 test_unseekable = None
2122
R David Murray67bfe802013-02-23 21:51:05 -05002123
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002124class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002125 tp = io.BufferedRandom
2126
2127 def test_constructor(self):
2128 BufferedRandomTest.test_constructor(self)
2129 # The allocation can succeed on 32-bit builds, e.g. with more
2130 # than 2GB RAM and a 64-bit kernel.
2131 if sys.maxsize > 0x7FFFFFFF:
2132 rawio = self.MockRawIO()
2133 bufio = self.tp(rawio)
2134 self.assertRaises((OverflowError, MemoryError, ValueError),
2135 bufio.__init__, rawio, sys.maxsize)
2136
2137 def test_garbage_collection(self):
2138 CBufferedReaderTest.test_garbage_collection(self)
2139 CBufferedWriterTest.test_garbage_collection(self)
2140
R David Murray67bfe802013-02-23 21:51:05 -05002141 def test_args_error(self):
2142 # Issue #17275
2143 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2144 self.tp(io.BytesIO(), 1024, 1024, 1024)
2145
2146
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002147class PyBufferedRandomTest(BufferedRandomTest):
2148 tp = pyio.BufferedRandom
2149
2150
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002151# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2152# properties:
2153# - A single output character can correspond to many bytes of input.
2154# - The number of input bytes to complete the character can be
2155# undetermined until the last input byte is received.
2156# - The number of input bytes can vary depending on previous input.
2157# - A single input byte can correspond to many characters of output.
2158# - The number of output characters can be undetermined until the
2159# last input byte is received.
2160# - The number of output characters can vary depending on previous input.
2161
2162class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2163 """
2164 For testing seek/tell behavior with a stateful, buffering decoder.
2165
2166 Input is a sequence of words. Words may be fixed-length (length set
2167 by input) or variable-length (period-terminated). In variable-length
2168 mode, extra periods are ignored. Possible words are:
2169 - 'i' followed by a number sets the input length, I (maximum 99).
2170 When I is set to 0, words are space-terminated.
2171 - 'o' followed by a number sets the output length, O (maximum 99).
2172 - Any other word is converted into a word followed by a period on
2173 the output. The output word consists of the input word truncated
2174 or padded out with hyphens to make its length equal to O. If O
2175 is 0, the word is output verbatim without truncating or padding.
2176 I and O are initially set to 1. When I changes, any buffered input is
2177 re-scanned according to the new I. EOF also terminates the last word.
2178 """
2179
2180 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002181 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002182 self.reset()
2183
2184 def __repr__(self):
2185 return '<SID %x>' % id(self)
2186
2187 def reset(self):
2188 self.i = 1
2189 self.o = 1
2190 self.buffer = bytearray()
2191
2192 def getstate(self):
2193 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2194 return bytes(self.buffer), i*100 + o
2195
2196 def setstate(self, state):
2197 buffer, io = state
2198 self.buffer = bytearray(buffer)
2199 i, o = divmod(io, 100)
2200 self.i, self.o = i ^ 1, o ^ 1
2201
2202 def decode(self, input, final=False):
2203 output = ''
2204 for b in input:
2205 if self.i == 0: # variable-length, terminated with period
2206 if b == ord('.'):
2207 if self.buffer:
2208 output += self.process_word()
2209 else:
2210 self.buffer.append(b)
2211 else: # fixed-length, terminate after self.i bytes
2212 self.buffer.append(b)
2213 if len(self.buffer) == self.i:
2214 output += self.process_word()
2215 if final and self.buffer: # EOF terminates the last word
2216 output += self.process_word()
2217 return output
2218
2219 def process_word(self):
2220 output = ''
2221 if self.buffer[0] == ord('i'):
2222 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2223 elif self.buffer[0] == ord('o'):
2224 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2225 else:
2226 output = self.buffer.decode('ascii')
2227 if len(output) < self.o:
2228 output += '-'*self.o # pad out with hyphens
2229 if self.o:
2230 output = output[:self.o] # truncate to output length
2231 output += '.'
2232 self.buffer = bytearray()
2233 return output
2234
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002235 codecEnabled = False
2236
2237 @classmethod
2238 def lookupTestDecoder(cls, name):
2239 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002240 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002241 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002242 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002243 incrementalencoder=None,
2244 streamreader=None, streamwriter=None,
2245 incrementaldecoder=cls)
2246
2247# Register the previous decoder for testing.
2248# Disabled by default, tests will enable it.
2249codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2250
2251
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002252class StatefulIncrementalDecoderTest(unittest.TestCase):
2253 """
2254 Make sure the StatefulIncrementalDecoder actually works.
2255 """
2256
2257 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002258 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002259 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002260 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002261 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002262 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002263 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002264 # I=0, O=6 (variable-length input, fixed-length output)
2265 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2266 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002267 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002268 # I=6, O=3 (fixed-length input > fixed-length output)
2269 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2270 # I=0, then 3; O=29, then 15 (with longer output)
2271 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2272 'a----------------------------.' +
2273 'b----------------------------.' +
2274 'cde--------------------------.' +
2275 'abcdefghijabcde.' +
2276 'a.b------------.' +
2277 '.c.------------.' +
2278 'd.e------------.' +
2279 'k--------------.' +
2280 'l--------------.' +
2281 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002282 ]
2283
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002284 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002285 # Try a few one-shot test cases.
2286 for input, eof, output in self.test_cases:
2287 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002288 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002289
2290 # Also test an unfinished decode, followed by forcing EOF.
2291 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002292 self.assertEqual(d.decode(b'oiabcd'), '')
2293 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002294
2295class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002296
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002297 def setUp(self):
2298 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2299 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002300 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002301
Guido van Rossumd0712812007-04-11 16:32:43 +00002302 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002303 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002304
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002305 def test_constructor(self):
2306 r = self.BytesIO(b"\xc3\xa9\n\n")
2307 b = self.BufferedReader(r, 1000)
2308 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002309 t.__init__(b, encoding="latin-1", newline="\r\n")
2310 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002311 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002312 t.__init__(b, encoding="utf-8", line_buffering=True)
2313 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002314 self.assertEqual(t.line_buffering, True)
2315 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002316 self.assertRaises(TypeError, t.__init__, b, newline=42)
2317 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2318
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002319 def test_uninitialized(self):
2320 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2321 del t
2322 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2323 self.assertRaises(Exception, repr, t)
2324 self.assertRaisesRegex((ValueError, AttributeError),
2325 'uninitialized|has no attribute',
2326 t.read, 0)
2327 t.__init__(self.MockRawIO())
2328 self.assertEqual(t.read(0), '')
2329
Nick Coghlana9b15242014-02-04 22:11:18 +10002330 def test_non_text_encoding_codecs_are_rejected(self):
2331 # Ensure the constructor complains if passed a codec that isn't
2332 # marked as a text encoding
2333 # http://bugs.python.org/issue20404
2334 r = self.BytesIO()
2335 b = self.BufferedWriter(r)
2336 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2337 self.TextIOWrapper(b, encoding="hex")
2338
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002339 def test_detach(self):
2340 r = self.BytesIO()
2341 b = self.BufferedWriter(r)
2342 t = self.TextIOWrapper(b)
2343 self.assertIs(t.detach(), b)
2344
2345 t = self.TextIOWrapper(b, encoding="ascii")
2346 t.write("howdy")
2347 self.assertFalse(r.getvalue())
2348 t.detach()
2349 self.assertEqual(r.getvalue(), b"howdy")
2350 self.assertRaises(ValueError, t.detach)
2351
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002352 # Operations independent of the detached stream should still work
2353 repr(t)
2354 self.assertEqual(t.encoding, "ascii")
2355 self.assertEqual(t.errors, "strict")
2356 self.assertFalse(t.line_buffering)
2357
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002358 def test_repr(self):
2359 raw = self.BytesIO("hello".encode("utf-8"))
2360 b = self.BufferedReader(raw)
2361 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002362 modname = self.TextIOWrapper.__module__
2363 self.assertEqual(repr(t),
2364 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2365 raw.name = "dummy"
2366 self.assertEqual(repr(t),
2367 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002368 t.mode = "r"
2369 self.assertEqual(repr(t),
2370 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002371 raw.name = b"dummy"
2372 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002373 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002374
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002375 t.buffer.detach()
2376 repr(t) # Should not raise an exception
2377
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002378 def test_line_buffering(self):
2379 r = self.BytesIO()
2380 b = self.BufferedWriter(r, 1000)
2381 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002382 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002383 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002384 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002385 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002386 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002387 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002388
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002389 def test_default_encoding(self):
2390 old_environ = dict(os.environ)
2391 try:
2392 # try to get a user preferred encoding different than the current
2393 # locale encoding to check that TextIOWrapper() uses the current
2394 # locale encoding and not the user preferred encoding
2395 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2396 if key in os.environ:
2397 del os.environ[key]
2398
2399 current_locale_encoding = locale.getpreferredencoding(False)
2400 b = self.BytesIO()
2401 t = self.TextIOWrapper(b)
2402 self.assertEqual(t.encoding, current_locale_encoding)
2403 finally:
2404 os.environ.clear()
2405 os.environ.update(old_environ)
2406
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002407 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002408 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002409 # Issue 15989
2410 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002411 b = self.BytesIO()
2412 b.fileno = lambda: _testcapi.INT_MAX + 1
2413 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2414 b.fileno = lambda: _testcapi.UINT_MAX + 1
2415 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2416
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002417 def test_encoding(self):
2418 # Check the encoding attribute is always set, and valid
2419 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002420 t = self.TextIOWrapper(b, encoding="utf-8")
2421 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002422 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002423 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002424 codecs.lookup(t.encoding)
2425
2426 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002427 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002428 b = self.BytesIO(b"abc\n\xff\n")
2429 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002430 self.assertRaises(UnicodeError, t.read)
2431 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002432 b = self.BytesIO(b"abc\n\xff\n")
2433 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002434 self.assertRaises(UnicodeError, t.read)
2435 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002436 b = self.BytesIO(b"abc\n\xff\n")
2437 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002438 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002439 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002440 b = self.BytesIO(b"abc\n\xff\n")
2441 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002442 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002443
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002444 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002445 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002446 b = self.BytesIO()
2447 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002448 self.assertRaises(UnicodeError, t.write, "\xff")
2449 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002450 b = self.BytesIO()
2451 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002452 self.assertRaises(UnicodeError, t.write, "\xff")
2453 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002454 b = self.BytesIO()
2455 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002456 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002457 t.write("abc\xffdef\n")
2458 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002459 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002460 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002461 b = self.BytesIO()
2462 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002463 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002464 t.write("abc\xffdef\n")
2465 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002466 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002467
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002468 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002469 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2470
2471 tests = [
2472 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002473 [ '', input_lines ],
2474 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2475 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2476 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002477 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002478 encodings = (
2479 'utf-8', 'latin-1',
2480 'utf-16', 'utf-16-le', 'utf-16-be',
2481 'utf-32', 'utf-32-le', 'utf-32-be',
2482 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002483
Guido van Rossum8358db22007-08-18 21:39:55 +00002484 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002485 # character in TextIOWrapper._pending_line.
2486 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002487 # XXX: str.encode() should return bytes
2488 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002489 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002490 for bufsize in range(1, 10):
2491 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002492 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2493 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002494 encoding=encoding)
2495 if do_reads:
2496 got_lines = []
2497 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002498 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002499 if c2 == '':
2500 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002501 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002502 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002503 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002504 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002505
2506 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002507 self.assertEqual(got_line, exp_line)
2508 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002509
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002510 def test_newlines_input(self):
2511 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002512 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2513 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002514 (None, normalized.decode("ascii").splitlines(keepends=True)),
2515 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002516 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2517 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2518 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002519 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002520 buf = self.BytesIO(testdata)
2521 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002522 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002523 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002524 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002525
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002526 def test_newlines_output(self):
2527 testdict = {
2528 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2529 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2530 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2531 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2532 }
2533 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2534 for newline, expected in tests:
2535 buf = self.BytesIO()
2536 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2537 txt.write("AAA\nB")
2538 txt.write("BB\nCCC\n")
2539 txt.write("X\rY\r\nZ")
2540 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002541 self.assertEqual(buf.closed, False)
2542 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002543
2544 def test_destructor(self):
2545 l = []
2546 base = self.BytesIO
2547 class MyBytesIO(base):
2548 def close(self):
2549 l.append(self.getvalue())
2550 base.close(self)
2551 b = MyBytesIO()
2552 t = self.TextIOWrapper(b, encoding="ascii")
2553 t.write("abc")
2554 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002555 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002556 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002557
2558 def test_override_destructor(self):
2559 record = []
2560 class MyTextIO(self.TextIOWrapper):
2561 def __del__(self):
2562 record.append(1)
2563 try:
2564 f = super().__del__
2565 except AttributeError:
2566 pass
2567 else:
2568 f()
2569 def close(self):
2570 record.append(2)
2571 super().close()
2572 def flush(self):
2573 record.append(3)
2574 super().flush()
2575 b = self.BytesIO()
2576 t = MyTextIO(b, encoding="ascii")
2577 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002578 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002579 self.assertEqual(record, [1, 2, 3])
2580
2581 def test_error_through_destructor(self):
2582 # Test that the exception state is not modified by a destructor,
2583 # even if close() fails.
2584 rawio = self.CloseFailureIO()
2585 def f():
2586 self.TextIOWrapper(rawio).xyzzy
2587 with support.captured_output("stderr") as s:
2588 self.assertRaises(AttributeError, f)
2589 s = s.getvalue().strip()
2590 if s:
2591 # The destructor *may* have printed an unraisable error, check it
2592 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002593 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002594 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002595
Guido van Rossum9b76da62007-04-11 01:09:03 +00002596 # Systematic tests of the text I/O API
2597
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002598 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002599 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002600 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002601 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002602 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002603 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002604 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002605 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002606 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002607 self.assertEqual(f.tell(), 0)
2608 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002609 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002610 self.assertEqual(f.seek(0), 0)
2611 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002612 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002613 self.assertEqual(f.read(2), "ab")
2614 self.assertEqual(f.read(1), "c")
2615 self.assertEqual(f.read(1), "")
2616 self.assertEqual(f.read(), "")
2617 self.assertEqual(f.tell(), cookie)
2618 self.assertEqual(f.seek(0), 0)
2619 self.assertEqual(f.seek(0, 2), cookie)
2620 self.assertEqual(f.write("def"), 3)
2621 self.assertEqual(f.seek(cookie), cookie)
2622 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002623 if enc.startswith("utf"):
2624 self.multi_line_test(f, enc)
2625 f.close()
2626
2627 def multi_line_test(self, f, enc):
2628 f.seek(0)
2629 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002630 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002631 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002632 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002633 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002634 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002635 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002636 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002637 wlines.append((f.tell(), line))
2638 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002639 f.seek(0)
2640 rlines = []
2641 while True:
2642 pos = f.tell()
2643 line = f.readline()
2644 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002645 break
2646 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002647 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002648
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002649 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002650 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002651 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002652 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002653 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002654 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002655 p2 = f.tell()
2656 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002657 self.assertEqual(f.tell(), p0)
2658 self.assertEqual(f.readline(), "\xff\n")
2659 self.assertEqual(f.tell(), p1)
2660 self.assertEqual(f.readline(), "\xff\n")
2661 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002662 f.seek(0)
2663 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002664 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002665 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002666 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002667 f.close()
2668
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002669 def test_seeking(self):
2670 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002671 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002672 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002673 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002674 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002675 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002676 suffix = bytes(u_suffix.encode("utf-8"))
2677 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002678 with self.open(support.TESTFN, "wb") as f:
2679 f.write(line*2)
2680 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2681 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002682 self.assertEqual(s, str(prefix, "ascii"))
2683 self.assertEqual(f.tell(), prefix_size)
2684 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002685
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002686 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002687 # Regression test for a specific bug
2688 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002689 with self.open(support.TESTFN, "wb") as f:
2690 f.write(data)
2691 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2692 f._CHUNK_SIZE # Just test that it exists
2693 f._CHUNK_SIZE = 2
2694 f.readline()
2695 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002696
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002697 def test_seek_and_tell(self):
2698 #Test seek/tell using the StatefulIncrementalDecoder.
2699 # Make test faster by doing smaller seeks
2700 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002701
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002702 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002703 """Tell/seek to various points within a data stream and ensure
2704 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002705 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002706 f.write(data)
2707 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002708 f = self.open(support.TESTFN, encoding='test_decoder')
2709 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002710 decoded = f.read()
2711 f.close()
2712
Neal Norwitze2b07052008-03-18 19:52:05 +00002713 for i in range(min_pos, len(decoded) + 1): # seek positions
2714 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002715 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002716 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002717 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002718 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002719 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002720 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002721 f.close()
2722
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002723 # Enable the test decoder.
2724 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002725
2726 # Run the tests.
2727 try:
2728 # Try each test case.
2729 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002730 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002731
2732 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002733 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2734 offset = CHUNK_SIZE - len(input)//2
2735 prefix = b'.'*offset
2736 # Don't bother seeking into the prefix (takes too long).
2737 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002738 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002739
2740 # Ensure our test decoder won't interfere with subsequent tests.
2741 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002742 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002743
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002744 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002745 data = "1234567890"
2746 tests = ("utf-16",
2747 "utf-16-le",
2748 "utf-16-be",
2749 "utf-32",
2750 "utf-32-le",
2751 "utf-32-be")
2752 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002753 buf = self.BytesIO()
2754 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002755 # Check if the BOM is written only once (see issue1753).
2756 f.write(data)
2757 f.write(data)
2758 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002759 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002760 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002761 self.assertEqual(f.read(), data * 2)
2762 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002763
Benjamin Petersona1b49012009-03-31 23:11:32 +00002764 def test_unreadable(self):
2765 class UnReadable(self.BytesIO):
2766 def readable(self):
2767 return False
2768 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002769 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002770
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002771 def test_read_one_by_one(self):
2772 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002773 reads = ""
2774 while True:
2775 c = txt.read(1)
2776 if not c:
2777 break
2778 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002779 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002780
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002781 def test_readlines(self):
2782 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2783 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2784 txt.seek(0)
2785 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2786 txt.seek(0)
2787 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2788
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002789 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002790 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002791 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002792 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002793 reads = ""
2794 while True:
2795 c = txt.read(128)
2796 if not c:
2797 break
2798 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002799 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002800
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002801 def test_writelines(self):
2802 l = ['ab', 'cd', 'ef']
2803 buf = self.BytesIO()
2804 txt = self.TextIOWrapper(buf)
2805 txt.writelines(l)
2806 txt.flush()
2807 self.assertEqual(buf.getvalue(), b'abcdef')
2808
2809 def test_writelines_userlist(self):
2810 l = UserList(['ab', 'cd', 'ef'])
2811 buf = self.BytesIO()
2812 txt = self.TextIOWrapper(buf)
2813 txt.writelines(l)
2814 txt.flush()
2815 self.assertEqual(buf.getvalue(), b'abcdef')
2816
2817 def test_writelines_error(self):
2818 txt = self.TextIOWrapper(self.BytesIO())
2819 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2820 self.assertRaises(TypeError, txt.writelines, None)
2821 self.assertRaises(TypeError, txt.writelines, b'abc')
2822
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002823 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002824 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002825
2826 # read one char at a time
2827 reads = ""
2828 while True:
2829 c = txt.read(1)
2830 if not c:
2831 break
2832 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002833 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002834
2835 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002836 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002837 txt._CHUNK_SIZE = 4
2838
2839 reads = ""
2840 while True:
2841 c = txt.read(4)
2842 if not c:
2843 break
2844 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002845 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002846
2847 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002848 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002849 txt._CHUNK_SIZE = 4
2850
2851 reads = txt.read(4)
2852 reads += txt.read(4)
2853 reads += txt.readline()
2854 reads += txt.readline()
2855 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002856 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002857
2858 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002859 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002860 txt._CHUNK_SIZE = 4
2861
2862 reads = txt.read(4)
2863 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002864 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002865
2866 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002867 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002868 txt._CHUNK_SIZE = 4
2869
2870 reads = txt.read(4)
2871 pos = txt.tell()
2872 txt.seek(0)
2873 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002874 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002875
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002876 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002877 buffer = self.BytesIO(self.testdata)
2878 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002879
2880 self.assertEqual(buffer.seekable(), txt.seekable())
2881
Antoine Pitroue4501852009-05-14 18:55:55 +00002882 def test_append_bom(self):
2883 # The BOM is not written again when appending to a non-empty file
2884 filename = support.TESTFN
2885 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2886 with self.open(filename, 'w', encoding=charset) as f:
2887 f.write('aaa')
2888 pos = f.tell()
2889 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002890 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002891
2892 with self.open(filename, 'a', encoding=charset) as f:
2893 f.write('xxx')
2894 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002895 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002896
2897 def test_seek_bom(self):
2898 # Same test, but when seeking manually
2899 filename = support.TESTFN
2900 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2901 with self.open(filename, 'w', encoding=charset) as f:
2902 f.write('aaa')
2903 pos = f.tell()
2904 with self.open(filename, 'r+', encoding=charset) as f:
2905 f.seek(pos)
2906 f.write('zzz')
2907 f.seek(0)
2908 f.write('bbb')
2909 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002910 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002911
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02002912 def test_seek_append_bom(self):
2913 # Same test, but first seek to the start and then to the end
2914 filename = support.TESTFN
2915 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2916 with self.open(filename, 'w', encoding=charset) as f:
2917 f.write('aaa')
2918 with self.open(filename, 'a', encoding=charset) as f:
2919 f.seek(0)
2920 f.seek(0, self.SEEK_END)
2921 f.write('xxx')
2922 with self.open(filename, 'rb') as f:
2923 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
2924
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002925 def test_errors_property(self):
2926 with self.open(support.TESTFN, "w") as f:
2927 self.assertEqual(f.errors, "strict")
2928 with self.open(support.TESTFN, "w", errors="replace") as f:
2929 self.assertEqual(f.errors, "replace")
2930
Brett Cannon31f59292011-02-21 19:29:56 +00002931 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002932 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002933 def test_threads_write(self):
2934 # Issue6750: concurrent writes could duplicate data
2935 event = threading.Event()
2936 with self.open(support.TESTFN, "w", buffering=1) as f:
2937 def run(n):
2938 text = "Thread%03d\n" % n
2939 event.wait()
2940 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002941 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002942 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002943 with support.start_threads(threads, event.set):
2944 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002945 with self.open(support.TESTFN) as f:
2946 content = f.read()
2947 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002948 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002949
Antoine Pitrou6be88762010-05-03 16:48:20 +00002950 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002951 # Test that text file is closed despite failed flush
2952 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00002953 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002954 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00002955 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002956 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002957 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002958 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002959 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002960 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002961 self.assertTrue(txt.buffer.closed)
2962 self.assertTrue(closed) # flush() called
2963 self.assertFalse(closed[0]) # flush() called before file closed
2964 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02002965 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00002966
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03002967 def test_close_error_on_close(self):
2968 buffer = self.BytesIO(self.testdata)
2969 def bad_flush():
2970 raise OSError('flush')
2971 def bad_close():
2972 raise OSError('close')
2973 buffer.close = bad_close
2974 txt = self.TextIOWrapper(buffer, encoding="ascii")
2975 txt.flush = bad_flush
2976 with self.assertRaises(OSError) as err: # exception not swallowed
2977 txt.close()
2978 self.assertEqual(err.exception.args, ('close',))
2979 self.assertIsInstance(err.exception.__context__, OSError)
2980 self.assertEqual(err.exception.__context__.args, ('flush',))
2981 self.assertFalse(txt.closed)
2982
2983 def test_nonnormalized_close_error_on_close(self):
2984 # Issue #21677
2985 buffer = self.BytesIO(self.testdata)
2986 def bad_flush():
2987 raise non_existing_flush
2988 def bad_close():
2989 raise non_existing_close
2990 buffer.close = bad_close
2991 txt = self.TextIOWrapper(buffer, encoding="ascii")
2992 txt.flush = bad_flush
2993 with self.assertRaises(NameError) as err: # exception not swallowed
2994 txt.close()
2995 self.assertIn('non_existing_close', str(err.exception))
2996 self.assertIsInstance(err.exception.__context__, NameError)
2997 self.assertIn('non_existing_flush', str(err.exception.__context__))
2998 self.assertFalse(txt.closed)
2999
Antoine Pitrou6be88762010-05-03 16:48:20 +00003000 def test_multi_close(self):
3001 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3002 txt.close()
3003 txt.close()
3004 txt.close()
3005 self.assertRaises(ValueError, txt.flush)
3006
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003007 def test_unseekable(self):
3008 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3009 self.assertRaises(self.UnsupportedOperation, txt.tell)
3010 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3011
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003012 def test_readonly_attributes(self):
3013 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3014 buf = self.BytesIO(self.testdata)
3015 with self.assertRaises(AttributeError):
3016 txt.buffer = buf
3017
Antoine Pitroue96ec682011-07-23 21:46:35 +02003018 def test_rawio(self):
3019 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3020 # that subprocess.Popen() can have the required unbuffered
3021 # semantics with universal_newlines=True.
3022 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3023 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3024 # Reads
3025 self.assertEqual(txt.read(4), 'abcd')
3026 self.assertEqual(txt.readline(), 'efghi\n')
3027 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3028
3029 def test_rawio_write_through(self):
3030 # Issue #12591: with write_through=True, writes don't need a flush
3031 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3032 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3033 write_through=True)
3034 txt.write('1')
3035 txt.write('23\n4')
3036 txt.write('5')
3037 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3038
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003039 def test_bufio_write_through(self):
3040 # Issue #21396: write_through=True doesn't force a flush()
3041 # on the underlying binary buffered object.
3042 flush_called, write_called = [], []
3043 class BufferedWriter(self.BufferedWriter):
3044 def flush(self, *args, **kwargs):
3045 flush_called.append(True)
3046 return super().flush(*args, **kwargs)
3047 def write(self, *args, **kwargs):
3048 write_called.append(True)
3049 return super().write(*args, **kwargs)
3050
3051 rawio = self.BytesIO()
3052 data = b"a"
3053 bufio = BufferedWriter(rawio, len(data)*2)
3054 textio = self.TextIOWrapper(bufio, encoding='ascii',
3055 write_through=True)
3056 # write to the buffered io but don't overflow the buffer
3057 text = data.decode('ascii')
3058 textio.write(text)
3059
3060 # buffer.flush is not called with write_through=True
3061 self.assertFalse(flush_called)
3062 # buffer.write *is* called with write_through=True
3063 self.assertTrue(write_called)
3064 self.assertEqual(rawio.getvalue(), b"") # no flush
3065
3066 write_called = [] # reset
3067 textio.write(text * 10) # total content is larger than bufio buffer
3068 self.assertTrue(write_called)
3069 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3070
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003071 def test_read_nonbytes(self):
3072 # Issue #17106
3073 # Crash when underlying read() returns non-bytes
3074 t = self.TextIOWrapper(self.StringIO('a'))
3075 self.assertRaises(TypeError, t.read, 1)
3076 t = self.TextIOWrapper(self.StringIO('a'))
3077 self.assertRaises(TypeError, t.readline)
3078 t = self.TextIOWrapper(self.StringIO('a'))
3079 self.assertRaises(TypeError, t.read)
3080
3081 def test_illegal_decoder(self):
3082 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003083 # Bypass the early encoding check added in issue 20404
3084 def _make_illegal_wrapper():
3085 quopri = codecs.lookup("quopri")
3086 quopri._is_text_encoding = True
3087 try:
3088 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3089 newline='\n', encoding="quopri")
3090 finally:
3091 quopri._is_text_encoding = False
3092 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003093 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003094 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003095 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003096 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003097 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003098 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003099 self.assertRaises(TypeError, t.read)
3100
Antoine Pitrou712cb732013-12-21 15:51:54 +01003101 def _check_create_at_shutdown(self, **kwargs):
3102 # Issue #20037: creating a TextIOWrapper at shutdown
3103 # shouldn't crash the interpreter.
3104 iomod = self.io.__name__
3105 code = """if 1:
3106 import codecs
3107 import {iomod} as io
3108
3109 # Avoid looking up codecs at shutdown
3110 codecs.lookup('utf-8')
3111
3112 class C:
3113 def __init__(self):
3114 self.buf = io.BytesIO()
3115 def __del__(self):
3116 io.TextIOWrapper(self.buf, **{kwargs})
3117 print("ok")
3118 c = C()
3119 """.format(iomod=iomod, kwargs=kwargs)
3120 return assert_python_ok("-c", code)
3121
3122 def test_create_at_shutdown_without_encoding(self):
3123 rc, out, err = self._check_create_at_shutdown()
3124 if err:
3125 # Can error out with a RuntimeError if the module state
3126 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003127 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003128 else:
3129 self.assertEqual("ok", out.decode().strip())
3130
3131 def test_create_at_shutdown_with_encoding(self):
3132 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3133 errors='strict')
3134 self.assertFalse(err)
3135 self.assertEqual("ok", out.decode().strip())
3136
Antoine Pitroub8503892014-04-29 10:14:02 +02003137 def test_read_byteslike(self):
3138 r = MemviewBytesIO(b'Just some random string\n')
3139 t = self.TextIOWrapper(r, 'utf-8')
3140
3141 # TextIOwrapper will not read the full string, because
3142 # we truncate it to a multiple of the native int size
3143 # so that we can construct a more complex memoryview.
3144 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3145
3146 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3147
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003148 def test_issue22849(self):
3149 class F(object):
3150 def readable(self): return True
3151 def writable(self): return True
3152 def seekable(self): return True
3153
3154 for i in range(10):
3155 try:
3156 self.TextIOWrapper(F(), encoding='utf-8')
3157 except Exception:
3158 pass
3159
3160 F.tell = lambda x: 0
3161 t = self.TextIOWrapper(F(), encoding='utf-8')
3162
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003163
Antoine Pitroub8503892014-04-29 10:14:02 +02003164class MemviewBytesIO(io.BytesIO):
3165 '''A BytesIO object whose read method returns memoryviews
3166 rather than bytes'''
3167
3168 def read1(self, len_):
3169 return _to_memoryview(super().read1(len_))
3170
3171 def read(self, len_):
3172 return _to_memoryview(super().read(len_))
3173
3174def _to_memoryview(buf):
3175 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3176
3177 arr = array.array('i')
3178 idx = len(buf) - len(buf) % arr.itemsize
3179 arr.frombytes(buf[:idx])
3180 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003181
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003182
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003183class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003184 io = io
Nick Coghlana9b15242014-02-04 22:11:18 +10003185 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003186
3187 def test_initialization(self):
3188 r = self.BytesIO(b"\xc3\xa9\n\n")
3189 b = self.BufferedReader(r, 1000)
3190 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003191 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3192 self.assertRaises(ValueError, t.read)
3193
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003194 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3195 self.assertRaises(Exception, repr, t)
3196
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003197 def test_garbage_collection(self):
3198 # C TextIOWrapper objects are collected, and collecting them flushes
3199 # all data to disk.
3200 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003201 with support.check_warnings(('', ResourceWarning)):
3202 rawio = io.FileIO(support.TESTFN, "wb")
3203 b = self.BufferedWriter(rawio)
3204 t = self.TextIOWrapper(b, encoding="ascii")
3205 t.write("456def")
3206 t.x = t
3207 wr = weakref.ref(t)
3208 del t
3209 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003210 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003211 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003212 self.assertEqual(f.read(), b"456def")
3213
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003214 def test_rwpair_cleared_before_textio(self):
3215 # Issue 13070: TextIOWrapper's finalization would crash when called
3216 # after the reference to the underlying BufferedRWPair's writer got
3217 # cleared by the GC.
3218 for i in range(1000):
3219 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3220 t1 = self.TextIOWrapper(b1, encoding="ascii")
3221 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3222 t2 = self.TextIOWrapper(b2, encoding="ascii")
3223 # circular references
3224 t1.buddy = t2
3225 t2.buddy = t1
3226 support.gc_collect()
3227
3228
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003229class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003230 io = pyio
Serhiy Storchakad667d722014-02-10 19:09:19 +02003231 #shutdown_error = "LookupError: unknown encoding: ascii"
3232 shutdown_error = "TypeError: 'NoneType' object is not iterable"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003233
3234
3235class IncrementalNewlineDecoderTest(unittest.TestCase):
3236
3237 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003238 # UTF-8 specific tests for a newline decoder
3239 def _check_decode(b, s, **kwargs):
3240 # We exercise getstate() / setstate() as well as decode()
3241 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003242 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003243 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003244 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003245
Antoine Pitrou180a3362008-12-14 16:36:46 +00003246 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003247
Antoine Pitrou180a3362008-12-14 16:36:46 +00003248 _check_decode(b'\xe8', "")
3249 _check_decode(b'\xa2', "")
3250 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003251
Antoine Pitrou180a3362008-12-14 16:36:46 +00003252 _check_decode(b'\xe8', "")
3253 _check_decode(b'\xa2', "")
3254 _check_decode(b'\x88', "\u8888")
3255
3256 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003257 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3258
Antoine Pitrou180a3362008-12-14 16:36:46 +00003259 decoder.reset()
3260 _check_decode(b'\n', "\n")
3261 _check_decode(b'\r', "")
3262 _check_decode(b'', "\n", final=True)
3263 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003264
Antoine Pitrou180a3362008-12-14 16:36:46 +00003265 _check_decode(b'\r', "")
3266 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003267
Antoine Pitrou180a3362008-12-14 16:36:46 +00003268 _check_decode(b'\r\r\n', "\n\n")
3269 _check_decode(b'\r', "")
3270 _check_decode(b'\r', "\n")
3271 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003272
Antoine Pitrou180a3362008-12-14 16:36:46 +00003273 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3274 _check_decode(b'\xe8\xa2\x88', "\u8888")
3275 _check_decode(b'\n', "\n")
3276 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3277 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003278
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003279 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003280 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003281 if encoding is not None:
3282 encoder = codecs.getincrementalencoder(encoding)()
3283 def _decode_bytewise(s):
3284 # Decode one byte at a time
3285 for b in encoder.encode(s):
3286 result.append(decoder.decode(bytes([b])))
3287 else:
3288 encoder = None
3289 def _decode_bytewise(s):
3290 # Decode one char at a time
3291 for c in s:
3292 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003293 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003294 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003295 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003296 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003297 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003298 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003299 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003300 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003301 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003302 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003303 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003304 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003305 input = "abc"
3306 if encoder is not None:
3307 encoder.reset()
3308 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003309 self.assertEqual(decoder.decode(input), "abc")
3310 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003311
3312 def test_newline_decoder(self):
3313 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003314 # None meaning the IncrementalNewlineDecoder takes unicode input
3315 # rather than bytes input
3316 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003317 'utf-16', 'utf-16-le', 'utf-16-be',
3318 'utf-32', 'utf-32-le', 'utf-32-be',
3319 )
3320 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003321 decoder = enc and codecs.getincrementaldecoder(enc)()
3322 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3323 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003324 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003325 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3326 self.check_newline_decoding_utf8(decoder)
3327
Antoine Pitrou66913e22009-03-06 23:40:56 +00003328 def test_newline_bytes(self):
3329 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3330 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003331 self.assertEqual(dec.newlines, None)
3332 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3333 self.assertEqual(dec.newlines, None)
3334 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3335 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003336 dec = self.IncrementalNewlineDecoder(None, translate=False)
3337 _check(dec)
3338 dec = self.IncrementalNewlineDecoder(None, translate=True)
3339 _check(dec)
3340
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003341class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3342 pass
3343
3344class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3345 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003346
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003347
Guido van Rossum01a27522007-03-07 01:00:12 +00003348# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003349
Guido van Rossum5abbf752007-08-27 17:39:33 +00003350class MiscIOTest(unittest.TestCase):
3351
Barry Warsaw40e82462008-11-20 20:14:50 +00003352 def tearDown(self):
3353 support.unlink(support.TESTFN)
3354
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003355 def test___all__(self):
3356 for name in self.io.__all__:
3357 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003358 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003359 if name == "open":
3360 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003361 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003362 self.assertTrue(issubclass(obj, Exception), name)
3363 elif not name.startswith("SEEK_"):
3364 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003365
Barry Warsaw40e82462008-11-20 20:14:50 +00003366 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003367 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003368 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003369 f.close()
3370
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003371 with support.check_warnings(('', DeprecationWarning)):
3372 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003373 self.assertEqual(f.name, support.TESTFN)
3374 self.assertEqual(f.buffer.name, support.TESTFN)
3375 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3376 self.assertEqual(f.mode, "U")
3377 self.assertEqual(f.buffer.mode, "rb")
3378 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003379 f.close()
3380
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003381 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003382 self.assertEqual(f.mode, "w+")
3383 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3384 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003385
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003386 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003387 self.assertEqual(g.mode, "wb")
3388 self.assertEqual(g.raw.mode, "wb")
3389 self.assertEqual(g.name, f.fileno())
3390 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003391 f.close()
3392 g.close()
3393
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003394 def test_io_after_close(self):
3395 for kwargs in [
3396 {"mode": "w"},
3397 {"mode": "wb"},
3398 {"mode": "w", "buffering": 1},
3399 {"mode": "w", "buffering": 2},
3400 {"mode": "wb", "buffering": 0},
3401 {"mode": "r"},
3402 {"mode": "rb"},
3403 {"mode": "r", "buffering": 1},
3404 {"mode": "r", "buffering": 2},
3405 {"mode": "rb", "buffering": 0},
3406 {"mode": "w+"},
3407 {"mode": "w+b"},
3408 {"mode": "w+", "buffering": 1},
3409 {"mode": "w+", "buffering": 2},
3410 {"mode": "w+b", "buffering": 0},
3411 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003412 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003413 f.close()
3414 self.assertRaises(ValueError, f.flush)
3415 self.assertRaises(ValueError, f.fileno)
3416 self.assertRaises(ValueError, f.isatty)
3417 self.assertRaises(ValueError, f.__iter__)
3418 if hasattr(f, "peek"):
3419 self.assertRaises(ValueError, f.peek, 1)
3420 self.assertRaises(ValueError, f.read)
3421 if hasattr(f, "read1"):
3422 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003423 if hasattr(f, "readall"):
3424 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003425 if hasattr(f, "readinto"):
3426 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003427 if hasattr(f, "readinto1"):
3428 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003429 self.assertRaises(ValueError, f.readline)
3430 self.assertRaises(ValueError, f.readlines)
3431 self.assertRaises(ValueError, f.seek, 0)
3432 self.assertRaises(ValueError, f.tell)
3433 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003434 self.assertRaises(ValueError, f.write,
3435 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003436 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003437 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003438
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003439 def test_blockingioerror(self):
3440 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003441 class C(str):
3442 pass
3443 c = C("")
3444 b = self.BlockingIOError(1, c)
3445 c.b = b
3446 b.c = c
3447 wr = weakref.ref(c)
3448 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003449 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003450 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003451
3452 def test_abcs(self):
3453 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003454 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3455 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3456 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3457 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003458
3459 def _check_abc_inheritance(self, abcmodule):
3460 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003461 self.assertIsInstance(f, abcmodule.IOBase)
3462 self.assertIsInstance(f, abcmodule.RawIOBase)
3463 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3464 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003465 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003466 self.assertIsInstance(f, abcmodule.IOBase)
3467 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3468 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3469 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003470 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003471 self.assertIsInstance(f, abcmodule.IOBase)
3472 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3473 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3474 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003475
3476 def test_abc_inheritance(self):
3477 # Test implementations inherit from their respective ABCs
3478 self._check_abc_inheritance(self)
3479
3480 def test_abc_inheritance_official(self):
3481 # Test implementations inherit from the official ABCs of the
3482 # baseline "io" module.
3483 self._check_abc_inheritance(io)
3484
Antoine Pitroue033e062010-10-29 10:38:18 +00003485 def _check_warn_on_dealloc(self, *args, **kwargs):
3486 f = open(*args, **kwargs)
3487 r = repr(f)
3488 with self.assertWarns(ResourceWarning) as cm:
3489 f = None
3490 support.gc_collect()
3491 self.assertIn(r, str(cm.warning.args[0]))
3492
3493 def test_warn_on_dealloc(self):
3494 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3495 self._check_warn_on_dealloc(support.TESTFN, "wb")
3496 self._check_warn_on_dealloc(support.TESTFN, "w")
3497
3498 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3499 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003500 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003501 for fd in fds:
3502 try:
3503 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003504 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003505 if e.errno != errno.EBADF:
3506 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003507 self.addCleanup(cleanup_fds)
3508 r, w = os.pipe()
3509 fds += r, w
3510 self._check_warn_on_dealloc(r, *args, **kwargs)
3511 # When using closefd=False, there's no warning
3512 r, w = os.pipe()
3513 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02003514 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00003515 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00003516
3517 def test_warn_on_dealloc_fd(self):
3518 self._check_warn_on_dealloc_fd("rb", buffering=0)
3519 self._check_warn_on_dealloc_fd("rb")
3520 self._check_warn_on_dealloc_fd("r")
3521
3522
Antoine Pitrou243757e2010-11-05 21:15:39 +00003523 def test_pickling(self):
3524 # Pickling file objects is forbidden
3525 for kwargs in [
3526 {"mode": "w"},
3527 {"mode": "wb"},
3528 {"mode": "wb", "buffering": 0},
3529 {"mode": "r"},
3530 {"mode": "rb"},
3531 {"mode": "rb", "buffering": 0},
3532 {"mode": "w+"},
3533 {"mode": "w+b"},
3534 {"mode": "w+b", "buffering": 0},
3535 ]:
3536 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3537 with self.open(support.TESTFN, **kwargs) as f:
3538 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3539
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003540 def test_nonblock_pipe_write_bigbuf(self):
3541 self._test_nonblock_pipe_write(16*1024)
3542
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003543 def test_nonblock_pipe_write_smallbuf(self):
3544 self._test_nonblock_pipe_write(1024)
3545
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003546 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3547 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003548 def _test_nonblock_pipe_write(self, bufsize):
3549 sent = []
3550 received = []
3551 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003552 os.set_blocking(r, False)
3553 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003554
3555 # To exercise all code paths in the C implementation we need
3556 # to play with buffer sizes. For instance, if we choose a
3557 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3558 # then we will never get a partial write of the buffer.
3559 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3560 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3561
3562 with rf, wf:
3563 for N in 9999, 73, 7574:
3564 try:
3565 i = 0
3566 while True:
3567 msg = bytes([i % 26 + 97]) * N
3568 sent.append(msg)
3569 wf.write(msg)
3570 i += 1
3571
3572 except self.BlockingIOError as e:
3573 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003574 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003575 sent[-1] = sent[-1][:e.characters_written]
3576 received.append(rf.read())
3577 msg = b'BLOCKED'
3578 wf.write(msg)
3579 sent.append(msg)
3580
3581 while True:
3582 try:
3583 wf.flush()
3584 break
3585 except self.BlockingIOError as e:
3586 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003587 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003588 self.assertEqual(e.characters_written, 0)
3589 received.append(rf.read())
3590
3591 received += iter(rf.read, None)
3592
3593 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003594 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003595 self.assertTrue(wf.closed)
3596 self.assertTrue(rf.closed)
3597
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003598 def test_create_fail(self):
3599 # 'x' mode fails if file is existing
3600 with self.open(support.TESTFN, 'w'):
3601 pass
3602 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3603
3604 def test_create_writes(self):
3605 # 'x' mode opens for writing
3606 with self.open(support.TESTFN, 'xb') as f:
3607 f.write(b"spam")
3608 with self.open(support.TESTFN, 'rb') as f:
3609 self.assertEqual(b"spam", f.read())
3610
Christian Heimes7b648752012-09-10 14:48:43 +02003611 def test_open_allargs(self):
3612 # there used to be a buffer overflow in the parser for rawmode
3613 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3614
3615
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003616class CMiscIOTest(MiscIOTest):
3617 io = io
3618
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003619 def test_readinto_buffer_overflow(self):
3620 # Issue #18025
3621 class BadReader(self.io.BufferedIOBase):
3622 def read(self, n=-1):
3623 return b'x' * 10**6
3624 bufio = BadReader()
3625 b = bytearray(2)
3626 self.assertRaises(ValueError, bufio.readinto, b)
3627
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003628 @unittest.skipUnless(threading, 'Threading required for this test.')
3629 def check_daemon_threads_shutdown_deadlock(self, stream_name):
3630 # Issue #23309: deadlocks at shutdown should be avoided when a
3631 # daemon thread and the main thread both write to a file.
3632 code = """if 1:
3633 import sys
3634 import time
3635 import threading
3636
3637 file = sys.{stream_name}
3638
3639 def run():
3640 while True:
3641 file.write('.')
3642 file.flush()
3643
3644 thread = threading.Thread(target=run)
3645 thread.daemon = True
3646 thread.start()
3647
3648 time.sleep(0.5)
3649 file.write('!')
3650 file.flush()
3651 """.format_map(locals())
3652 res, _ = run_python_until_end("-c", code)
3653 err = res.err.decode()
3654 if res.rc != 0:
3655 # Failure: should be a fatal error
3656 self.assertIn("Fatal Python error: could not acquire lock "
3657 "for <_io.BufferedWriter name='<{stream_name}>'> "
3658 "at interpreter shutdown, possibly due to "
3659 "daemon threads".format_map(locals()),
3660 err)
3661 else:
3662 self.assertFalse(err.strip('.!'))
3663
3664 def test_daemon_threads_shutdown_stdout_deadlock(self):
3665 self.check_daemon_threads_shutdown_deadlock('stdout')
3666
3667 def test_daemon_threads_shutdown_stderr_deadlock(self):
3668 self.check_daemon_threads_shutdown_deadlock('stderr')
3669
3670
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003671class PyMiscIOTest(MiscIOTest):
3672 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003673
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003674
3675@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3676class SignalsTest(unittest.TestCase):
3677
3678 def setUp(self):
3679 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3680
3681 def tearDown(self):
3682 signal.signal(signal.SIGALRM, self.oldalrm)
3683
3684 def alarm_interrupt(self, sig, frame):
3685 1/0
3686
3687 @unittest.skipUnless(threading, 'Threading required for this test.')
3688 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3689 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003690 invokes the signal handler, and bubbles up the exception raised
3691 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003692 read_results = []
3693 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003694 if hasattr(signal, 'pthread_sigmask'):
3695 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003696 s = os.read(r, 1)
3697 read_results.append(s)
3698 t = threading.Thread(target=_read)
3699 t.daemon = True
3700 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003701 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01003702 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003703 try:
3704 wio = self.io.open(w, **fdopen_kwargs)
3705 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003706 # Fill the pipe enough that the write will be blocking.
3707 # It will be interrupted by the timer armed above. Since the
3708 # other thread has read one byte, the low-level write will
3709 # return with a successful (partial) result rather than an EINTR.
3710 # The buffered IO layer must check for pending signal
3711 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003712 signal.alarm(1)
3713 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01003714 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02003715 finally:
3716 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003717 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003718 # We got one byte, get another one and check that it isn't a
3719 # repeat of the first one.
3720 read_results.append(os.read(r, 1))
3721 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3722 finally:
3723 os.close(w)
3724 os.close(r)
3725 # This is deliberate. If we didn't close the file descriptor
3726 # before closing wio, wio would try to flush its internal
3727 # buffer, and block again.
3728 try:
3729 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003730 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003731 if e.errno != errno.EBADF:
3732 raise
3733
3734 def test_interrupted_write_unbuffered(self):
3735 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3736
3737 def test_interrupted_write_buffered(self):
3738 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3739
Victor Stinner6ab72862014-09-03 23:32:28 +02003740 # Issue #22331: The test hangs on FreeBSD 7.2
3741 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003742 def test_interrupted_write_text(self):
3743 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3744
Brett Cannon31f59292011-02-21 19:29:56 +00003745 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003746 def check_reentrant_write(self, data, **fdopen_kwargs):
3747 def on_alarm(*args):
3748 # Will be called reentrantly from the same thread
3749 wio.write(data)
3750 1/0
3751 signal.signal(signal.SIGALRM, on_alarm)
3752 r, w = os.pipe()
3753 wio = self.io.open(w, **fdopen_kwargs)
3754 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003755 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003756 # Either the reentrant call to wio.write() fails with RuntimeError,
3757 # or the signal handler raises ZeroDivisionError.
3758 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3759 while 1:
3760 for i in range(100):
3761 wio.write(data)
3762 wio.flush()
3763 # Make sure the buffer doesn't fill up and block further writes
3764 os.read(r, len(data) * 100)
3765 exc = cm.exception
3766 if isinstance(exc, RuntimeError):
3767 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3768 finally:
3769 wio.close()
3770 os.close(r)
3771
3772 def test_reentrant_write_buffered(self):
3773 self.check_reentrant_write(b"xy", mode="wb")
3774
3775 def test_reentrant_write_text(self):
3776 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3777
Antoine Pitrou707ce822011-02-25 21:24:11 +00003778 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3779 """Check that a buffered read, when it gets interrupted (either
3780 returning a partial result or EINTR), properly invokes the signal
3781 handler and retries if the latter returned successfully."""
3782 r, w = os.pipe()
3783 fdopen_kwargs["closefd"] = False
3784 def alarm_handler(sig, frame):
3785 os.write(w, b"bar")
3786 signal.signal(signal.SIGALRM, alarm_handler)
3787 try:
3788 rio = self.io.open(r, **fdopen_kwargs)
3789 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003790 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003791 # Expected behaviour:
3792 # - first raw read() returns partial b"foo"
3793 # - second raw read() returns EINTR
3794 # - third raw read() returns b"bar"
3795 self.assertEqual(decode(rio.read(6)), "foobar")
3796 finally:
3797 rio.close()
3798 os.close(w)
3799 os.close(r)
3800
Antoine Pitrou20db5112011-08-19 20:32:34 +02003801 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003802 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3803 mode="rb")
3804
Antoine Pitrou20db5112011-08-19 20:32:34 +02003805 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003806 self.check_interrupted_read_retry(lambda x: x,
3807 mode="r")
3808
3809 @unittest.skipUnless(threading, 'Threading required for this test.')
3810 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3811 """Check that a buffered write, when it gets interrupted (either
3812 returning a partial result or EINTR), properly invokes the signal
3813 handler and retries if the latter returned successfully."""
3814 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003815
Antoine Pitrou707ce822011-02-25 21:24:11 +00003816 # A quantity that exceeds the buffer size of an anonymous pipe's
3817 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003818 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003819 r, w = os.pipe()
3820 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003821
Antoine Pitrou707ce822011-02-25 21:24:11 +00003822 # We need a separate thread to read from the pipe and allow the
3823 # write() to finish. This thread is started after the SIGALRM is
3824 # received (forcing a first EINTR in write()).
3825 read_results = []
3826 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003827 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00003828 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003829 try:
3830 while not write_finished:
3831 while r in select.select([r], [], [], 1.0)[0]:
3832 s = os.read(r, 1024)
3833 read_results.append(s)
3834 except BaseException as exc:
3835 nonlocal error
3836 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00003837 t = threading.Thread(target=_read)
3838 t.daemon = True
3839 def alarm1(sig, frame):
3840 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003841 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003842 def alarm2(sig, frame):
3843 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003844
3845 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00003846 signal.signal(signal.SIGALRM, alarm1)
3847 try:
3848 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003849 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003850 # Expected behaviour:
3851 # - first raw write() is partial (because of the limited pipe buffer
3852 # and the first alarm)
3853 # - second raw write() returns EINTR (because of the second alarm)
3854 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003855 written = wio.write(large_data)
3856 self.assertEqual(N, written)
3857
Antoine Pitrou707ce822011-02-25 21:24:11 +00003858 wio.flush()
3859 write_finished = True
3860 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003861
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003862 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003863 self.assertEqual(N, sum(len(x) for x in read_results))
3864 finally:
3865 write_finished = True
3866 os.close(w)
3867 os.close(r)
3868 # This is deliberate. If we didn't close the file descriptor
3869 # before closing wio, wio would try to flush its internal
3870 # buffer, and could block (in case of failure).
3871 try:
3872 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003873 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003874 if e.errno != errno.EBADF:
3875 raise
3876
Antoine Pitrou20db5112011-08-19 20:32:34 +02003877 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003878 self.check_interrupted_write_retry(b"x", mode="wb")
3879
Antoine Pitrou20db5112011-08-19 20:32:34 +02003880 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003881 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3882
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003883
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003884class CSignalsTest(SignalsTest):
3885 io = io
3886
3887class PySignalsTest(SignalsTest):
3888 io = pyio
3889
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003890 # Handling reentrancy issues would slow down _pyio even more, so the
3891 # tests are disabled.
3892 test_reentrant_write_buffered = None
3893 test_reentrant_write_text = None
3894
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003895
Ezio Melottidaa42c72013-03-23 16:30:16 +02003896def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07003897 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003898 CBufferedReaderTest, PyBufferedReaderTest,
3899 CBufferedWriterTest, PyBufferedWriterTest,
3900 CBufferedRWPairTest, PyBufferedRWPairTest,
3901 CBufferedRandomTest, PyBufferedRandomTest,
3902 StatefulIncrementalDecoderTest,
3903 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3904 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003905 CMiscIOTest, PyMiscIOTest,
3906 CSignalsTest, PySignalsTest,
3907 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003908
3909 # Put the namespaces of the IO module we are testing and some useful mock
3910 # classes in the __dict__ of each test.
3911 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003912 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003913 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3914 c_io_ns = {name : getattr(io, name) for name in all_members}
3915 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3916 globs = globals()
3917 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3918 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3919 # Avoid turning open into a bound method.
3920 py_io_ns["open"] = pyio.OpenWrapper
3921 for test in tests:
3922 if test.__name__.startswith("C"):
3923 for name, obj in c_io_ns.items():
3924 setattr(test, name, obj)
3925 elif test.__name__.startswith("Py"):
3926 for name, obj in py_io_ns.items():
3927 setattr(test, name, obj)
3928
Ezio Melottidaa42c72013-03-23 16:30:16 +02003929 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3930 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003931
3932if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003933 unittest.main()