blob: 5111882a0258e90a408104344dff5467d4fac7ac [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Berker Peksagce643912015-05-06 06:33:17 +030038from test.support.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Martin Panter6bb91f32016-05-28 00:41:57 +000048try:
49 import ctypes
50except ImportError:
51 def byteslike(*pos, **kw):
52 return array.array("b", bytes(*pos, **kw))
53else:
54 def byteslike(*pos, **kw):
55 """Create a bytes-like object having no string or sequence methods"""
56 data = bytes(*pos, **kw)
57 obj = EmptyStruct()
58 ctypes.resize(obj, len(data))
59 memoryview(obj).cast("B")[:] = data
60 return obj
61 class EmptyStruct(ctypes.Structure):
62 pass
63
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064def _default_chunk_size():
65 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000066 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000067 return f._CHUNK_SIZE
68
69
Antoine Pitrou328ec742010-09-14 18:37:24 +000070class MockRawIOWithoutRead:
71 """A RawIO implementation without read(), so as to exercise the default
72 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000073
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000074 def __init__(self, read_stack=()):
75 self._read_stack = list(read_stack)
76 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000077 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000078 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000079
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000082 return len(b)
83
84 def writable(self):
85 return True
86
Guido van Rossum68bbcd22007-02-27 17:19:33 +000087 def fileno(self):
88 return 42
89
90 def readable(self):
91 return True
92
Guido van Rossum01a27522007-03-07 01:00:12 +000093 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000094 return True
95
Guido van Rossum01a27522007-03-07 01:00:12 +000096 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000098
99 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000100 return 0 # same comment as above
101
102 def readinto(self, buf):
103 self._reads += 1
104 max_len = len(buf)
105 try:
106 data = self._read_stack[0]
107 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000108 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000109 return 0
110 if data is None:
111 del self._read_stack[0]
112 return None
113 n = len(data)
114 if len(data) <= max_len:
115 del self._read_stack[0]
116 buf[:n] = data
117 return n
118 else:
119 buf[:] = data[:max_len]
120 self._read_stack[0] = data[max_len:]
121 return max_len
122
123 def truncate(self, pos=None):
124 return pos
125
Antoine Pitrou328ec742010-09-14 18:37:24 +0000126class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
127 pass
128
129class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
130 pass
131
132
133class MockRawIO(MockRawIOWithoutRead):
134
135 def read(self, n=None):
136 self._reads += 1
137 try:
138 return self._read_stack.pop(0)
139 except:
140 self._extraneous_reads += 1
141 return b""
142
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000143class CMockRawIO(MockRawIO, io.RawIOBase):
144 pass
145
146class PyMockRawIO(MockRawIO, pyio.RawIOBase):
147 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000148
Guido van Rossuma9e20242007-03-08 00:43:48 +0000149
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000150class MisbehavedRawIO(MockRawIO):
151 def write(self, b):
152 return super().write(b) * 2
153
154 def read(self, n=None):
155 return super().read(n) * 2
156
157 def seek(self, pos, whence):
158 return -123
159
160 def tell(self):
161 return -456
162
163 def readinto(self, buf):
164 super().readinto(buf)
165 return len(buf) * 5
166
167class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
168 pass
169
170class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
171 pass
172
173
174class CloseFailureIO(MockRawIO):
175 closed = 0
176
177 def close(self):
178 if not self.closed:
179 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200180 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181
182class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
183 pass
184
185class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
186 pass
187
188
189class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000190
191 def __init__(self, data):
192 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000194
195 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000197 self.read_history.append(None if res is None else len(res))
198 return res
199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def readinto(self, b):
201 res = super().readinto(b)
202 self.read_history.append(res)
203 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000204
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000205class CMockFileIO(MockFileIO, io.BytesIO):
206 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000207
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000208class PyMockFileIO(MockFileIO, pyio.BytesIO):
209 pass
210
211
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000212class MockUnseekableIO:
213 def seekable(self):
214 return False
215
216 def seek(self, *args):
217 raise self.UnsupportedOperation("not seekable")
218
219 def tell(self, *args):
220 raise self.UnsupportedOperation("not seekable")
221
Martin Panter754aab22016-03-31 07:21:56 +0000222 def truncate(self, *args):
223 raise self.UnsupportedOperation("not seekable")
224
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000225class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
226 UnsupportedOperation = io.UnsupportedOperation
227
228class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
229 UnsupportedOperation = pyio.UnsupportedOperation
230
231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000232class MockNonBlockWriterIO:
233
234 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000235 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000236 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000238 def pop_written(self):
239 s = b"".join(self._write_stack)
240 self._write_stack[:] = []
241 return s
242
243 def block_on(self, char):
244 """Block when a given char is encountered."""
245 self._blocker_char = char
246
247 def readable(self):
248 return True
249
250 def seekable(self):
251 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000252
Guido van Rossum01a27522007-03-07 01:00:12 +0000253 def writable(self):
254 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000255
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000256 def write(self, b):
257 b = bytes(b)
258 n = -1
259 if self._blocker_char:
260 try:
261 n = b.index(self._blocker_char)
262 except ValueError:
263 pass
264 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100265 if n > 0:
266 # write data up to the first blocker
267 self._write_stack.append(b[:n])
268 return n
269 else:
270 # cancel blocker and indicate would block
271 self._blocker_char = None
272 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000273 self._write_stack.append(b)
274 return len(b)
275
276class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
277 BlockingIOError = io.BlockingIOError
278
279class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
280 BlockingIOError = pyio.BlockingIOError
281
Guido van Rossuma9e20242007-03-08 00:43:48 +0000282
Guido van Rossum28524c72007-02-27 05:47:44 +0000283class IOTest(unittest.TestCase):
284
Neal Norwitze7789b12008-03-24 06:18:09 +0000285 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000286 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000287
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000288 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000289 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000290
Guido van Rossum28524c72007-02-27 05:47:44 +0000291 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000292 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293 f.truncate(0)
294 self.assertEqual(f.tell(), 5)
295 f.seek(0)
296
297 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000298 self.assertEqual(f.seek(0), 0)
299 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000300 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000301 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000302 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000303 buffer = bytearray(b" world\n\n\n")
304 self.assertEqual(f.write(buffer), 9)
305 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000306 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000307 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000308 self.assertEqual(f.seek(-1, 2), 13)
309 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000310
Guido van Rossum87429772007-04-10 21:06:59 +0000311 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000312 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000313 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000314
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 def read_ops(self, f, buffered=False):
316 data = f.read(5)
317 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000318 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000320 self.assertEqual(bytes(data), b" worl")
321 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000322 self.assertEqual(f.readinto(data), 2)
323 self.assertEqual(len(data), 5)
324 self.assertEqual(data[:2], b"d\n")
325 self.assertEqual(f.seek(0), 0)
326 self.assertEqual(f.read(20), b"hello world\n")
327 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000328 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000329 self.assertEqual(f.seek(-6, 2), 6)
330 self.assertEqual(f.read(5), b"world")
331 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000332 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000333 self.assertEqual(f.seek(-6, 1), 5)
334 self.assertEqual(f.read(5), b" worl")
335 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000336 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000337 if buffered:
338 f.seek(0)
339 self.assertEqual(f.read(), b"hello world\n")
340 f.seek(6)
341 self.assertEqual(f.read(), b"world\n")
342 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000343 f.seek(0)
344 data = byteslike(5)
345 self.assertEqual(f.readinto1(data), 5)
346 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000347
Guido van Rossum34d69e52007-04-10 20:08:41 +0000348 LARGE = 2**31
349
Guido van Rossum53807da2007-04-10 19:01:47 +0000350 def large_file_ops(self, f):
351 assert f.readable()
352 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000353 self.assertEqual(f.seek(self.LARGE), self.LARGE)
354 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000355 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000356 self.assertEqual(f.tell(), self.LARGE + 3)
357 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000358 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000359 self.assertEqual(f.tell(), self.LARGE + 2)
360 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000361 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000362 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000363 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
364 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000365 self.assertEqual(f.read(2), b"x")
366
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000367 def test_invalid_operations(self):
368 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000369 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000370 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000371 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000372 self.assertRaises(exc, fp.read)
373 self.assertRaises(exc, fp.readline)
374 with self.open(support.TESTFN, "wb", buffering=0) as fp:
375 self.assertRaises(exc, fp.read)
376 self.assertRaises(exc, fp.readline)
377 with self.open(support.TESTFN, "rb", buffering=0) as fp:
378 self.assertRaises(exc, fp.write, b"blah")
379 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000380 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000381 self.assertRaises(exc, fp.write, b"blah")
382 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000383 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000384 self.assertRaises(exc, fp.write, "blah")
385 self.assertRaises(exc, fp.writelines, ["blah\n"])
386 # Non-zero seeking from current or end pos
387 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
388 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000389
Martin Panter754aab22016-03-31 07:21:56 +0000390 def test_optional_abilities(self):
391 # Test for OSError when optional APIs are not supported
392 # The purpose of this test is to try fileno(), reading, writing and
393 # seeking operations with various objects that indicate they do not
394 # support these operations.
395
396 def pipe_reader():
397 [r, w] = os.pipe()
398 os.close(w) # So that read() is harmless
399 return self.FileIO(r, "r")
400
401 def pipe_writer():
402 [r, w] = os.pipe()
403 self.addCleanup(os.close, r)
404 # Guarantee that we can write into the pipe without blocking
405 thread = threading.Thread(target=os.read, args=(r, 100))
406 thread.start()
407 self.addCleanup(thread.join)
408 return self.FileIO(w, "w")
409
410 def buffered_reader():
411 return self.BufferedReader(self.MockUnseekableIO())
412
413 def buffered_writer():
414 return self.BufferedWriter(self.MockUnseekableIO())
415
416 def buffered_random():
417 return self.BufferedRandom(self.BytesIO())
418
419 def buffered_rw_pair():
420 return self.BufferedRWPair(self.MockUnseekableIO(),
421 self.MockUnseekableIO())
422
423 def text_reader():
424 class UnseekableReader(self.MockUnseekableIO):
425 writable = self.BufferedIOBase.writable
426 write = self.BufferedIOBase.write
427 return self.TextIOWrapper(UnseekableReader(), "ascii")
428
429 def text_writer():
430 class UnseekableWriter(self.MockUnseekableIO):
431 readable = self.BufferedIOBase.readable
432 read = self.BufferedIOBase.read
433 return self.TextIOWrapper(UnseekableWriter(), "ascii")
434
435 tests = (
436 (pipe_reader, "fr"), (pipe_writer, "fw"),
437 (buffered_reader, "r"), (buffered_writer, "w"),
438 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
439 (text_reader, "r"), (text_writer, "w"),
440 (self.BytesIO, "rws"), (self.StringIO, "rws"),
441 )
442 for [test, abilities] in tests:
443 if test is pipe_writer and not threading:
444 continue # Skip subtest that uses a background thread
445 with self.subTest(test), test() as obj:
446 readable = "r" in abilities
447 self.assertEqual(obj.readable(), readable)
448 writable = "w" in abilities
449 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000450
451 if isinstance(obj, self.TextIOBase):
452 data = "3"
453 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
454 data = b"3"
455 else:
456 self.fail("Unknown base class")
457
458 if "f" in abilities:
459 obj.fileno()
460 else:
461 self.assertRaises(OSError, obj.fileno)
462
463 if readable:
464 obj.read(1)
465 obj.read()
466 else:
467 self.assertRaises(OSError, obj.read, 1)
468 self.assertRaises(OSError, obj.read)
469
470 if writable:
471 obj.write(data)
472 else:
473 self.assertRaises(OSError, obj.write, data)
474
Martin Panter3ee147f2016-03-31 21:05:31 +0000475 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000476 pipe_reader, pipe_writer):
477 # Pipes seem to appear as seekable on Windows
478 continue
479 seekable = "s" in abilities
480 self.assertEqual(obj.seekable(), seekable)
481
Martin Panter754aab22016-03-31 07:21:56 +0000482 if seekable:
483 obj.tell()
484 obj.seek(0)
485 else:
486 self.assertRaises(OSError, obj.tell)
487 self.assertRaises(OSError, obj.seek, 0)
488
489 if writable and seekable:
490 obj.truncate()
491 obj.truncate(0)
492 else:
493 self.assertRaises(OSError, obj.truncate)
494 self.assertRaises(OSError, obj.truncate, 0)
495
Antoine Pitrou13348842012-01-29 18:36:34 +0100496 def test_open_handles_NUL_chars(self):
497 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300498 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
499 self.assertRaises(ValueError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100500
Guido van Rossum28524c72007-02-27 05:47:44 +0000501 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000502 with self.open(support.TESTFN, "wb", buffering=0) as f:
503 self.assertEqual(f.readable(), False)
504 self.assertEqual(f.writable(), True)
505 self.assertEqual(f.seekable(), True)
506 self.write_ops(f)
507 with self.open(support.TESTFN, "rb", buffering=0) as f:
508 self.assertEqual(f.readable(), True)
509 self.assertEqual(f.writable(), False)
510 self.assertEqual(f.seekable(), True)
511 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000512
Guido van Rossum87429772007-04-10 21:06:59 +0000513 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000514 with self.open(support.TESTFN, "wb") as f:
515 self.assertEqual(f.readable(), False)
516 self.assertEqual(f.writable(), True)
517 self.assertEqual(f.seekable(), True)
518 self.write_ops(f)
519 with self.open(support.TESTFN, "rb") as f:
520 self.assertEqual(f.readable(), True)
521 self.assertEqual(f.writable(), False)
522 self.assertEqual(f.seekable(), True)
523 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000524
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000525 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000526 with self.open(support.TESTFN, "wb") as f:
527 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
528 with self.open(support.TESTFN, "rb") as f:
529 self.assertEqual(f.readline(), b"abc\n")
530 self.assertEqual(f.readline(10), b"def\n")
531 self.assertEqual(f.readline(2), b"xy")
532 self.assertEqual(f.readline(4), b"zzy\n")
533 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000534 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000535 self.assertRaises(TypeError, f.readline, 5.3)
536 with self.open(support.TESTFN, "r") as f:
537 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000538
Guido van Rossum28524c72007-02-27 05:47:44 +0000539 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000540 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000541 self.write_ops(f)
542 data = f.getvalue()
543 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000544 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000545 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000546
Guido van Rossum53807da2007-04-10 19:01:47 +0000547 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000548 # On Windows and Mac OSX this test comsumes large resources; It takes
549 # a long time to build the >2GB file and takes >2GB of disk space
550 # therefore the resource must be enabled to run this test.
551 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600552 support.requires(
553 'largefile',
554 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000555 with self.open(support.TESTFN, "w+b", 0) as f:
556 self.large_file_ops(f)
557 with self.open(support.TESTFN, "w+b") as f:
558 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000559
560 def test_with_open(self):
561 for bufsize in (0, 1, 100):
562 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000563 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000564 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000565 self.assertEqual(f.closed, True)
566 f = None
567 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000568 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000569 1/0
570 except ZeroDivisionError:
571 self.assertEqual(f.closed, True)
572 else:
573 self.fail("1/0 didn't raise an exception")
574
Antoine Pitrou08838b62009-01-21 00:55:13 +0000575 # issue 5008
576 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000577 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000578 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000579 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000580 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000581 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000582 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000583 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300584 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000585
Guido van Rossum87429772007-04-10 21:06:59 +0000586 def test_destructor(self):
587 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000588 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000589 def __del__(self):
590 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000591 try:
592 f = super().__del__
593 except AttributeError:
594 pass
595 else:
596 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000597 def close(self):
598 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000599 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000600 def flush(self):
601 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000602 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000603 with support.check_warnings(('', ResourceWarning)):
604 f = MyFileIO(support.TESTFN, "wb")
605 f.write(b"xxx")
606 del f
607 support.gc_collect()
608 self.assertEqual(record, [1, 2, 3])
609 with self.open(support.TESTFN, "rb") as f:
610 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000611
612 def _check_base_destructor(self, base):
613 record = []
614 class MyIO(base):
615 def __init__(self):
616 # This exercises the availability of attributes on object
617 # destruction.
618 # (in the C version, close() is called by the tp_dealloc
619 # function, not by __del__)
620 self.on_del = 1
621 self.on_close = 2
622 self.on_flush = 3
623 def __del__(self):
624 record.append(self.on_del)
625 try:
626 f = super().__del__
627 except AttributeError:
628 pass
629 else:
630 f()
631 def close(self):
632 record.append(self.on_close)
633 super().close()
634 def flush(self):
635 record.append(self.on_flush)
636 super().flush()
637 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000638 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000639 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000640 self.assertEqual(record, [1, 2, 3])
641
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000642 def test_IOBase_destructor(self):
643 self._check_base_destructor(self.IOBase)
644
645 def test_RawIOBase_destructor(self):
646 self._check_base_destructor(self.RawIOBase)
647
648 def test_BufferedIOBase_destructor(self):
649 self._check_base_destructor(self.BufferedIOBase)
650
651 def test_TextIOBase_destructor(self):
652 self._check_base_destructor(self.TextIOBase)
653
Guido van Rossum87429772007-04-10 21:06:59 +0000654 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000655 with self.open(support.TESTFN, "wb") as f:
656 f.write(b"xxx")
657 with self.open(support.TESTFN, "rb") as f:
658 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000659
Guido van Rossumd4103952007-04-12 05:44:49 +0000660 def test_array_writes(self):
661 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000662 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000663 def check(f):
664 with f:
665 self.assertEqual(f.write(a), n)
666 f.writelines((a,))
667 check(self.BytesIO())
668 check(self.FileIO(support.TESTFN, "w"))
669 check(self.BufferedWriter(self.MockRawIO()))
670 check(self.BufferedRandom(self.MockRawIO()))
671 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000672
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000673 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000674 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000675 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000676
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000677 def test_read_closed(self):
678 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000679 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000680 with self.open(support.TESTFN, "r") as f:
681 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000682 self.assertEqual(file.read(), "egg\n")
683 file.seek(0)
684 file.close()
685 self.assertRaises(ValueError, file.read)
686
687 def test_no_closefd_with_filename(self):
688 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000689 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000690
691 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000692 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000693 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000694 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000695 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000696 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000697 self.assertEqual(file.buffer.raw.closefd, False)
698
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000699 def test_garbage_collection(self):
700 # FileIO objects are collected, and collecting them flushes
701 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000702 with support.check_warnings(('', ResourceWarning)):
703 f = self.FileIO(support.TESTFN, "wb")
704 f.write(b"abcxxx")
705 f.f = f
706 wr = weakref.ref(f)
707 del f
708 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300709 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000710 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000711 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000712
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000713 def test_unbounded_file(self):
714 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
715 zero = "/dev/zero"
716 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000717 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000718 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000719 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000720 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000721 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000722 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000723 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000724 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000725 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000726 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000727 self.assertRaises(OverflowError, f.read)
728
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200729 def check_flush_error_on_close(self, *args, **kwargs):
730 # Test that the file is closed despite failed flush
731 # and that flush() is called before file closed.
732 f = self.open(*args, **kwargs)
733 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000734 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200735 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200736 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000737 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200738 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600739 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200740 self.assertTrue(closed) # flush() called
741 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200742 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200743
744 def test_flush_error_on_close(self):
745 # raw file
746 # Issue #5700: io.FileIO calls flush() after file closed
747 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
748 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
749 self.check_flush_error_on_close(fd, 'wb', buffering=0)
750 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
751 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
752 os.close(fd)
753 # buffered io
754 self.check_flush_error_on_close(support.TESTFN, 'wb')
755 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
756 self.check_flush_error_on_close(fd, 'wb')
757 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
758 self.check_flush_error_on_close(fd, 'wb', closefd=False)
759 os.close(fd)
760 # text io
761 self.check_flush_error_on_close(support.TESTFN, 'w')
762 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
763 self.check_flush_error_on_close(fd, 'w')
764 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
765 self.check_flush_error_on_close(fd, 'w', closefd=False)
766 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000767
768 def test_multi_close(self):
769 f = self.open(support.TESTFN, "wb", buffering=0)
770 f.close()
771 f.close()
772 f.close()
773 self.assertRaises(ValueError, f.flush)
774
Antoine Pitrou328ec742010-09-14 18:37:24 +0000775 def test_RawIOBase_read(self):
776 # Exercise the default RawIOBase.read() implementation (which calls
777 # readinto() internally).
778 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
779 self.assertEqual(rawio.read(2), b"ab")
780 self.assertEqual(rawio.read(2), b"c")
781 self.assertEqual(rawio.read(2), b"d")
782 self.assertEqual(rawio.read(2), None)
783 self.assertEqual(rawio.read(2), b"ef")
784 self.assertEqual(rawio.read(2), b"g")
785 self.assertEqual(rawio.read(2), None)
786 self.assertEqual(rawio.read(2), b"")
787
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400788 def test_types_have_dict(self):
789 test = (
790 self.IOBase(),
791 self.RawIOBase(),
792 self.TextIOBase(),
793 self.StringIO(),
794 self.BytesIO()
795 )
796 for obj in test:
797 self.assertTrue(hasattr(obj, "__dict__"))
798
Ross Lagerwall59142db2011-10-31 20:34:46 +0200799 def test_opener(self):
800 with self.open(support.TESTFN, "w") as f:
801 f.write("egg\n")
802 fd = os.open(support.TESTFN, os.O_RDONLY)
803 def opener(path, flags):
804 return fd
805 with self.open("non-existent", "r", opener=opener) as f:
806 self.assertEqual(f.read(), "egg\n")
807
Barry Warsaw480e2852016-06-08 17:47:26 -0400808 def test_bad_opener_negative_1(self):
809 # Issue #27066.
810 def badopener(fname, flags):
811 return -1
812 with self.assertRaises(ValueError) as cm:
813 open('non-existent', 'r', opener=badopener)
814 self.assertEqual(str(cm.exception), 'opener returned -1')
815
816 def test_bad_opener_other_negative(self):
817 # Issue #27066.
818 def badopener(fname, flags):
819 return -2
820 with self.assertRaises(ValueError) as cm:
821 open('non-existent', 'r', opener=badopener)
822 self.assertEqual(str(cm.exception), 'opener returned -2')
823
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200824 def test_fileio_closefd(self):
825 # Issue #4841
826 with self.open(__file__, 'rb') as f1, \
827 self.open(__file__, 'rb') as f2:
828 fileio = self.FileIO(f1.fileno(), closefd=False)
829 # .__init__() must not close f1
830 fileio.__init__(f2.fileno(), closefd=False)
831 f1.readline()
832 # .close() must not close f2
833 fileio.close()
834 f2.readline()
835
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300836 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200837 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300838 with self.assertRaises(ValueError):
839 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300840
841 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200842 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300843 with self.assertRaises(ValueError):
844 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300845
Martin Panter6bb91f32016-05-28 00:41:57 +0000846 def test_buffered_readinto_mixin(self):
847 # Test the implementation provided by BufferedIOBase
848 class Stream(self.BufferedIOBase):
849 def read(self, size):
850 return b"12345"
851 read1 = read
852 stream = Stream()
853 for method in ("readinto", "readinto1"):
854 with self.subTest(method):
855 buffer = byteslike(5)
856 self.assertEqual(getattr(stream, method)(buffer), 5)
857 self.assertEqual(bytes(buffer), b"12345")
858
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200859
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000860class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200861
862 def test_IOBase_finalize(self):
863 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
864 # class which inherits IOBase and an object of this class are caught
865 # in a reference cycle and close() is already in the method cache.
866 class MyIO(self.IOBase):
867 def close(self):
868 pass
869
870 # create an instance to populate the method cache
871 MyIO()
872 obj = MyIO()
873 obj.obj = obj
874 wr = weakref.ref(obj)
875 del MyIO
876 del obj
877 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300878 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000879
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000880class PyIOTest(IOTest):
881 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000882
Guido van Rossuma9e20242007-03-08 00:43:48 +0000883
Gregory P. Smith1bef9072015-04-14 13:24:34 -0700884@support.cpython_only
885class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -0700886
Gregory P. Smith054b0652015-04-14 12:58:05 -0700887 def test_RawIOBase_io_in_pyio_match(self):
888 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +0200889 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
890 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -0700891 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
892
893 def test_RawIOBase_pyio_in_io_match(self):
894 """Test that c RawIOBase class has all pyio RawIOBase methods"""
895 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
896 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
897
898
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000899class CommonBufferedTests:
900 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
901
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000902 def test_detach(self):
903 raw = self.MockRawIO()
904 buf = self.tp(raw)
905 self.assertIs(buf.detach(), raw)
906 self.assertRaises(ValueError, buf.detach)
907
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600908 repr(buf) # Should still work
909
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000910 def test_fileno(self):
911 rawio = self.MockRawIO()
912 bufio = self.tp(rawio)
913
Ezio Melottib3aedd42010-11-20 19:04:17 +0000914 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000915
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000916 def test_invalid_args(self):
917 rawio = self.MockRawIO()
918 bufio = self.tp(rawio)
919 # Invalid whence
920 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200921 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000922
923 def test_override_destructor(self):
924 tp = self.tp
925 record = []
926 class MyBufferedIO(tp):
927 def __del__(self):
928 record.append(1)
929 try:
930 f = super().__del__
931 except AttributeError:
932 pass
933 else:
934 f()
935 def close(self):
936 record.append(2)
937 super().close()
938 def flush(self):
939 record.append(3)
940 super().flush()
941 rawio = self.MockRawIO()
942 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000943 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000944 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +0000945 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000946
947 def test_context_manager(self):
948 # Test usability as a context manager
949 rawio = self.MockRawIO()
950 bufio = self.tp(rawio)
951 def _with():
952 with bufio:
953 pass
954 _with()
955 # bufio should now be closed, and using it a second time should raise
956 # a ValueError.
957 self.assertRaises(ValueError, _with)
958
959 def test_error_through_destructor(self):
960 # Test that the exception state is not modified by a destructor,
961 # even if close() fails.
962 rawio = self.CloseFailureIO()
963 def f():
964 self.tp(rawio).xyzzy
965 with support.captured_output("stderr") as s:
966 self.assertRaises(AttributeError, f)
967 s = s.getvalue().strip()
968 if s:
969 # The destructor *may* have printed an unraisable error, check it
970 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200971 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000972 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000973
Antoine Pitrou716c4442009-05-23 19:04:03 +0000974 def test_repr(self):
975 raw = self.MockRawIO()
976 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +0300977 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +0000978 self.assertEqual(repr(b), "<%s>" % clsname)
979 raw.name = "dummy"
980 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
981 raw.name = b"dummy"
982 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
983
Antoine Pitrou6be88762010-05-03 16:48:20 +0000984 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200985 # Test that buffered file is closed despite failed flush
986 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +0000987 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200988 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000989 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200990 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200991 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000992 raw.flush = bad_flush
993 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200994 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600995 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200996 self.assertTrue(raw.closed)
997 self.assertTrue(closed) # flush() called
998 self.assertFalse(closed[0]) # flush() called before file closed
999 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001000 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001001
1002 def test_close_error_on_close(self):
1003 raw = self.MockRawIO()
1004 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001005 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001006 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001007 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001008 raw.close = bad_close
1009 b = self.tp(raw)
1010 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001011 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001012 b.close()
1013 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001014 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001015 self.assertEqual(err.exception.__context__.args, ('flush',))
1016 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001017
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001018 def test_nonnormalized_close_error_on_close(self):
1019 # Issue #21677
1020 raw = self.MockRawIO()
1021 def bad_flush():
1022 raise non_existing_flush
1023 def bad_close():
1024 raise non_existing_close
1025 raw.close = bad_close
1026 b = self.tp(raw)
1027 b.flush = bad_flush
1028 with self.assertRaises(NameError) as err: # exception not swallowed
1029 b.close()
1030 self.assertIn('non_existing_close', str(err.exception))
1031 self.assertIsInstance(err.exception.__context__, NameError)
1032 self.assertIn('non_existing_flush', str(err.exception.__context__))
1033 self.assertFalse(b.closed)
1034
Antoine Pitrou6be88762010-05-03 16:48:20 +00001035 def test_multi_close(self):
1036 raw = self.MockRawIO()
1037 b = self.tp(raw)
1038 b.close()
1039 b.close()
1040 b.close()
1041 self.assertRaises(ValueError, b.flush)
1042
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001043 def test_unseekable(self):
1044 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1045 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1046 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1047
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001048 def test_readonly_attributes(self):
1049 raw = self.MockRawIO()
1050 buf = self.tp(raw)
1051 x = self.MockRawIO()
1052 with self.assertRaises(AttributeError):
1053 buf.raw = x
1054
Guido van Rossum78892e42007-04-06 17:31:18 +00001055
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001056class SizeofTest:
1057
1058 @support.cpython_only
1059 def test_sizeof(self):
1060 bufsize1 = 4096
1061 bufsize2 = 8192
1062 rawio = self.MockRawIO()
1063 bufio = self.tp(rawio, buffer_size=bufsize1)
1064 size = sys.getsizeof(bufio) - bufsize1
1065 rawio = self.MockRawIO()
1066 bufio = self.tp(rawio, buffer_size=bufsize2)
1067 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1068
Jesus Ceadc469452012-10-04 12:37:56 +02001069 @support.cpython_only
1070 def test_buffer_freeing(self) :
1071 bufsize = 4096
1072 rawio = self.MockRawIO()
1073 bufio = self.tp(rawio, buffer_size=bufsize)
1074 size = sys.getsizeof(bufio) - bufsize
1075 bufio.close()
1076 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001077
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001078class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1079 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001080
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001081 def test_constructor(self):
1082 rawio = self.MockRawIO([b"abc"])
1083 bufio = self.tp(rawio)
1084 bufio.__init__(rawio)
1085 bufio.__init__(rawio, buffer_size=1024)
1086 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001087 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001088 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1089 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1090 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1091 rawio = self.MockRawIO([b"abc"])
1092 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001093 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001094
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001095 def test_uninitialized(self):
1096 bufio = self.tp.__new__(self.tp)
1097 del bufio
1098 bufio = self.tp.__new__(self.tp)
1099 self.assertRaisesRegex((ValueError, AttributeError),
1100 'uninitialized|has no attribute',
1101 bufio.read, 0)
1102 bufio.__init__(self.MockRawIO())
1103 self.assertEqual(bufio.read(0), b'')
1104
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001105 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001106 for arg in (None, 7):
1107 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1108 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001109 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001110 # Invalid args
1111 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001112
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001113 def test_read1(self):
1114 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1115 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001116 self.assertEqual(b"a", bufio.read(1))
1117 self.assertEqual(b"b", bufio.read1(1))
1118 self.assertEqual(rawio._reads, 1)
1119 self.assertEqual(b"c", bufio.read1(100))
1120 self.assertEqual(rawio._reads, 1)
1121 self.assertEqual(b"d", bufio.read1(100))
1122 self.assertEqual(rawio._reads, 2)
1123 self.assertEqual(b"efg", bufio.read1(100))
1124 self.assertEqual(rawio._reads, 3)
1125 self.assertEqual(b"", bufio.read1(100))
1126 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001127 # Invalid args
1128 self.assertRaises(ValueError, bufio.read1, -1)
1129
1130 def test_readinto(self):
1131 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1132 bufio = self.tp(rawio)
1133 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001134 self.assertEqual(bufio.readinto(b), 2)
1135 self.assertEqual(b, b"ab")
1136 self.assertEqual(bufio.readinto(b), 2)
1137 self.assertEqual(b, b"cd")
1138 self.assertEqual(bufio.readinto(b), 2)
1139 self.assertEqual(b, b"ef")
1140 self.assertEqual(bufio.readinto(b), 1)
1141 self.assertEqual(b, b"gf")
1142 self.assertEqual(bufio.readinto(b), 0)
1143 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001144 rawio = self.MockRawIO((b"abc", None))
1145 bufio = self.tp(rawio)
1146 self.assertEqual(bufio.readinto(b), 2)
1147 self.assertEqual(b, b"ab")
1148 self.assertEqual(bufio.readinto(b), 1)
1149 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001150
Benjamin Petersona96fea02014-06-22 14:17:44 -07001151 def test_readinto1(self):
1152 buffer_size = 10
1153 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1154 bufio = self.tp(rawio, buffer_size=buffer_size)
1155 b = bytearray(2)
1156 self.assertEqual(bufio.peek(3), b'abc')
1157 self.assertEqual(rawio._reads, 1)
1158 self.assertEqual(bufio.readinto1(b), 2)
1159 self.assertEqual(b, b"ab")
1160 self.assertEqual(rawio._reads, 1)
1161 self.assertEqual(bufio.readinto1(b), 1)
1162 self.assertEqual(b[:1], b"c")
1163 self.assertEqual(rawio._reads, 1)
1164 self.assertEqual(bufio.readinto1(b), 2)
1165 self.assertEqual(b, b"de")
1166 self.assertEqual(rawio._reads, 2)
1167 b = bytearray(2*buffer_size)
1168 self.assertEqual(bufio.peek(3), b'fgh')
1169 self.assertEqual(rawio._reads, 3)
1170 self.assertEqual(bufio.readinto1(b), 6)
1171 self.assertEqual(b[:6], b"fghjkl")
1172 self.assertEqual(rawio._reads, 4)
1173
1174 def test_readinto_array(self):
1175 buffer_size = 60
1176 data = b"a" * 26
1177 rawio = self.MockRawIO((data,))
1178 bufio = self.tp(rawio, buffer_size=buffer_size)
1179
1180 # Create an array with element size > 1 byte
1181 b = array.array('i', b'x' * 32)
1182 assert len(b) != 16
1183
1184 # Read into it. We should get as many *bytes* as we can fit into b
1185 # (which is more than the number of elements)
1186 n = bufio.readinto(b)
1187 self.assertGreater(n, len(b))
1188
1189 # Check that old contents of b are preserved
1190 bm = memoryview(b).cast('B')
1191 self.assertLess(n, len(bm))
1192 self.assertEqual(bm[:n], data[:n])
1193 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1194
1195 def test_readinto1_array(self):
1196 buffer_size = 60
1197 data = b"a" * 26
1198 rawio = self.MockRawIO((data,))
1199 bufio = self.tp(rawio, buffer_size=buffer_size)
1200
1201 # Create an array with element size > 1 byte
1202 b = array.array('i', b'x' * 32)
1203 assert len(b) != 16
1204
1205 # Read into it. We should get as many *bytes* as we can fit into b
1206 # (which is more than the number of elements)
1207 n = bufio.readinto1(b)
1208 self.assertGreater(n, len(b))
1209
1210 # Check that old contents of b are preserved
1211 bm = memoryview(b).cast('B')
1212 self.assertLess(n, len(bm))
1213 self.assertEqual(bm[:n], data[:n])
1214 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1215
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001216 def test_readlines(self):
1217 def bufio():
1218 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1219 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001220 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1221 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1222 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001223
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001224 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001225 data = b"abcdefghi"
1226 dlen = len(data)
1227
1228 tests = [
1229 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1230 [ 100, [ 3, 3, 3], [ dlen ] ],
1231 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1232 ]
1233
1234 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001235 rawio = self.MockFileIO(data)
1236 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001237 pos = 0
1238 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001239 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001240 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001241 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001242 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001243
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001244 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001245 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001246 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1247 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001248 self.assertEqual(b"abcd", bufio.read(6))
1249 self.assertEqual(b"e", bufio.read(1))
1250 self.assertEqual(b"fg", bufio.read())
1251 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001252 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001253 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001254
Victor Stinnera80987f2011-05-25 22:47:16 +02001255 rawio = self.MockRawIO((b"a", None, None))
1256 self.assertEqual(b"a", rawio.readall())
1257 self.assertIsNone(rawio.readall())
1258
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001259 def test_read_past_eof(self):
1260 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1261 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001262
Ezio Melottib3aedd42010-11-20 19:04:17 +00001263 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001264
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001265 def test_read_all(self):
1266 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1267 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001268
Ezio Melottib3aedd42010-11-20 19:04:17 +00001269 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001270
Victor Stinner45df8202010-04-28 22:31:17 +00001271 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001272 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001273 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001274 try:
1275 # Write out many bytes with exactly the same number of 0's,
1276 # 1's... 255's. This will help us check that concurrent reading
1277 # doesn't duplicate or forget contents.
1278 N = 1000
1279 l = list(range(256)) * N
1280 random.shuffle(l)
1281 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001282 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001283 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001284 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001285 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001286 errors = []
1287 results = []
1288 def f():
1289 try:
1290 # Intra-buffer read then buffer-flushing read
1291 for n in cycle([1, 19]):
1292 s = bufio.read(n)
1293 if not s:
1294 break
1295 # list.append() is atomic
1296 results.append(s)
1297 except Exception as e:
1298 errors.append(e)
1299 raise
1300 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001301 with support.start_threads(threads):
1302 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001303 self.assertFalse(errors,
1304 "the following exceptions were caught: %r" % errors)
1305 s = b''.join(results)
1306 for i in range(256):
1307 c = bytes(bytearray([i]))
1308 self.assertEqual(s.count(c), N)
1309 finally:
1310 support.unlink(support.TESTFN)
1311
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001312 def test_unseekable(self):
1313 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1314 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1315 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1316 bufio.read(1)
1317 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1318 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1319
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001320 def test_misbehaved_io(self):
1321 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1322 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001323 self.assertRaises(OSError, bufio.seek, 0)
1324 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001325
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001326 def test_no_extraneous_read(self):
1327 # Issue #9550; when the raw IO object has satisfied the read request,
1328 # we should not issue any additional reads, otherwise it may block
1329 # (e.g. socket).
1330 bufsize = 16
1331 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1332 rawio = self.MockRawIO([b"x" * n])
1333 bufio = self.tp(rawio, bufsize)
1334 self.assertEqual(bufio.read(n), b"x" * n)
1335 # Simple case: one raw read is enough to satisfy the request.
1336 self.assertEqual(rawio._extraneous_reads, 0,
1337 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1338 # A more complex case where two raw reads are needed to satisfy
1339 # the request.
1340 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1341 bufio = self.tp(rawio, bufsize)
1342 self.assertEqual(bufio.read(n), b"x" * n)
1343 self.assertEqual(rawio._extraneous_reads, 0,
1344 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1345
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001346 def test_read_on_closed(self):
1347 # Issue #23796
1348 b = io.BufferedReader(io.BytesIO(b"12"))
1349 b.read(1)
1350 b.close()
1351 self.assertRaises(ValueError, b.peek)
1352 self.assertRaises(ValueError, b.read1, 1)
1353
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001354
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001355class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001356 tp = io.BufferedReader
1357
1358 def test_constructor(self):
1359 BufferedReaderTest.test_constructor(self)
1360 # The allocation can succeed on 32-bit builds, e.g. with more
1361 # than 2GB RAM and a 64-bit kernel.
1362 if sys.maxsize > 0x7FFFFFFF:
1363 rawio = self.MockRawIO()
1364 bufio = self.tp(rawio)
1365 self.assertRaises((OverflowError, MemoryError, ValueError),
1366 bufio.__init__, rawio, sys.maxsize)
1367
1368 def test_initialization(self):
1369 rawio = self.MockRawIO([b"abc"])
1370 bufio = self.tp(rawio)
1371 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1372 self.assertRaises(ValueError, bufio.read)
1373 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1374 self.assertRaises(ValueError, bufio.read)
1375 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1376 self.assertRaises(ValueError, bufio.read)
1377
1378 def test_misbehaved_io_read(self):
1379 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1380 bufio = self.tp(rawio)
1381 # _pyio.BufferedReader seems to implement reading different, so that
1382 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001383 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001384
1385 def test_garbage_collection(self):
1386 # C BufferedReader objects are collected.
1387 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001388 with support.check_warnings(('', ResourceWarning)):
1389 rawio = self.FileIO(support.TESTFN, "w+b")
1390 f = self.tp(rawio)
1391 f.f = f
1392 wr = weakref.ref(f)
1393 del f
1394 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001395 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001396
R David Murray67bfe802013-02-23 21:51:05 -05001397 def test_args_error(self):
1398 # Issue #17275
1399 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1400 self.tp(io.BytesIO(), 1024, 1024, 1024)
1401
1402
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001403class PyBufferedReaderTest(BufferedReaderTest):
1404 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001405
Guido van Rossuma9e20242007-03-08 00:43:48 +00001406
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001407class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1408 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001409
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001410 def test_constructor(self):
1411 rawio = self.MockRawIO()
1412 bufio = self.tp(rawio)
1413 bufio.__init__(rawio)
1414 bufio.__init__(rawio, buffer_size=1024)
1415 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001416 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001417 bufio.flush()
1418 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1419 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1420 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1421 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001422 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001423 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001424 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001425
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001426 def test_uninitialized(self):
1427 bufio = self.tp.__new__(self.tp)
1428 del bufio
1429 bufio = self.tp.__new__(self.tp)
1430 self.assertRaisesRegex((ValueError, AttributeError),
1431 'uninitialized|has no attribute',
1432 bufio.write, b'')
1433 bufio.__init__(self.MockRawIO())
1434 self.assertEqual(bufio.write(b''), 0)
1435
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001436 def test_detach_flush(self):
1437 raw = self.MockRawIO()
1438 buf = self.tp(raw)
1439 buf.write(b"howdy!")
1440 self.assertFalse(raw._write_stack)
1441 buf.detach()
1442 self.assertEqual(raw._write_stack, [b"howdy!"])
1443
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001444 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001445 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001446 writer = self.MockRawIO()
1447 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001448 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001449 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001450 buffer = bytearray(b"def")
1451 bufio.write(buffer)
1452 buffer[:] = b"***" # Overwrite our copy of the data
1453 bufio.flush()
1454 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001455
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001456 def test_write_overflow(self):
1457 writer = self.MockRawIO()
1458 bufio = self.tp(writer, 8)
1459 contents = b"abcdefghijklmnop"
1460 for n in range(0, len(contents), 3):
1461 bufio.write(contents[n:n+3])
1462 flushed = b"".join(writer._write_stack)
1463 # At least (total - 8) bytes were implicitly flushed, perhaps more
1464 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001465 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001466
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001467 def check_writes(self, intermediate_func):
1468 # Lots of writes, test the flushed output is as expected.
1469 contents = bytes(range(256)) * 1000
1470 n = 0
1471 writer = self.MockRawIO()
1472 bufio = self.tp(writer, 13)
1473 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1474 def gen_sizes():
1475 for size in count(1):
1476 for i in range(15):
1477 yield size
1478 sizes = gen_sizes()
1479 while n < len(contents):
1480 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001481 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001482 intermediate_func(bufio)
1483 n += size
1484 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001485 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001486
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001487 def test_writes(self):
1488 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001489
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001490 def test_writes_and_flushes(self):
1491 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001492
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001493 def test_writes_and_seeks(self):
1494 def _seekabs(bufio):
1495 pos = bufio.tell()
1496 bufio.seek(pos + 1, 0)
1497 bufio.seek(pos - 1, 0)
1498 bufio.seek(pos, 0)
1499 self.check_writes(_seekabs)
1500 def _seekrel(bufio):
1501 pos = bufio.seek(0, 1)
1502 bufio.seek(+1, 1)
1503 bufio.seek(-1, 1)
1504 bufio.seek(pos, 0)
1505 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001506
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001507 def test_writes_and_truncates(self):
1508 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001509
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001510 def test_write_non_blocking(self):
1511 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001512 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001513
Ezio Melottib3aedd42010-11-20 19:04:17 +00001514 self.assertEqual(bufio.write(b"abcd"), 4)
1515 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001516 # 1 byte will be written, the rest will be buffered
1517 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001518 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001519
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001520 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1521 raw.block_on(b"0")
1522 try:
1523 bufio.write(b"opqrwxyz0123456789")
1524 except self.BlockingIOError as e:
1525 written = e.characters_written
1526 else:
1527 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001528 self.assertEqual(written, 16)
1529 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001530 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001531
Ezio Melottib3aedd42010-11-20 19:04:17 +00001532 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001533 s = raw.pop_written()
1534 # Previously buffered bytes were flushed
1535 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001536
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001537 def test_write_and_rewind(self):
1538 raw = io.BytesIO()
1539 bufio = self.tp(raw, 4)
1540 self.assertEqual(bufio.write(b"abcdef"), 6)
1541 self.assertEqual(bufio.tell(), 6)
1542 bufio.seek(0, 0)
1543 self.assertEqual(bufio.write(b"XY"), 2)
1544 bufio.seek(6, 0)
1545 self.assertEqual(raw.getvalue(), b"XYcdef")
1546 self.assertEqual(bufio.write(b"123456"), 6)
1547 bufio.flush()
1548 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001549
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001550 def test_flush(self):
1551 writer = self.MockRawIO()
1552 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001553 bufio.write(b"abc")
1554 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001555 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001556
Antoine Pitrou131a4892012-10-16 22:57:11 +02001557 def test_writelines(self):
1558 l = [b'ab', b'cd', b'ef']
1559 writer = self.MockRawIO()
1560 bufio = self.tp(writer, 8)
1561 bufio.writelines(l)
1562 bufio.flush()
1563 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1564
1565 def test_writelines_userlist(self):
1566 l = UserList([b'ab', b'cd', b'ef'])
1567 writer = self.MockRawIO()
1568 bufio = self.tp(writer, 8)
1569 bufio.writelines(l)
1570 bufio.flush()
1571 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1572
1573 def test_writelines_error(self):
1574 writer = self.MockRawIO()
1575 bufio = self.tp(writer, 8)
1576 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1577 self.assertRaises(TypeError, bufio.writelines, None)
1578 self.assertRaises(TypeError, bufio.writelines, 'abc')
1579
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001580 def test_destructor(self):
1581 writer = self.MockRawIO()
1582 bufio = self.tp(writer, 8)
1583 bufio.write(b"abc")
1584 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001585 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001586 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001587
1588 def test_truncate(self):
1589 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001590 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001591 bufio = self.tp(raw, 8)
1592 bufio.write(b"abcdef")
1593 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001594 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001595 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001596 self.assertEqual(f.read(), b"abc")
1597
Victor Stinner45df8202010-04-28 22:31:17 +00001598 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001599 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001600 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001601 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001602 # Write out many bytes from many threads and test they were
1603 # all flushed.
1604 N = 1000
1605 contents = bytes(range(256)) * N
1606 sizes = cycle([1, 19])
1607 n = 0
1608 queue = deque()
1609 while n < len(contents):
1610 size = next(sizes)
1611 queue.append(contents[n:n+size])
1612 n += size
1613 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001614 # We use a real file object because it allows us to
1615 # exercise situations where the GIL is released before
1616 # writing the buffer to the raw streams. This is in addition
1617 # to concurrency issues due to switching threads in the middle
1618 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001619 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001620 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001621 errors = []
1622 def f():
1623 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001624 while True:
1625 try:
1626 s = queue.popleft()
1627 except IndexError:
1628 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001629 bufio.write(s)
1630 except Exception as e:
1631 errors.append(e)
1632 raise
1633 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001634 with support.start_threads(threads):
1635 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001636 self.assertFalse(errors,
1637 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001638 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001639 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001640 s = f.read()
1641 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001642 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001643 finally:
1644 support.unlink(support.TESTFN)
1645
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001646 def test_misbehaved_io(self):
1647 rawio = self.MisbehavedRawIO()
1648 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001649 self.assertRaises(OSError, bufio.seek, 0)
1650 self.assertRaises(OSError, bufio.tell)
1651 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001652
Florent Xicluna109d5732012-07-07 17:03:22 +02001653 def test_max_buffer_size_removal(self):
1654 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001655 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001656
Benjamin Peterson68623612012-12-20 11:53:11 -06001657 def test_write_error_on_close(self):
1658 raw = self.MockRawIO()
1659 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001660 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001661 raw.write = bad_write
1662 b = self.tp(raw)
1663 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001664 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001665 self.assertTrue(b.closed)
1666
Benjamin Peterson59406a92009-03-26 17:10:29 +00001667
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001668class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001669 tp = io.BufferedWriter
1670
1671 def test_constructor(self):
1672 BufferedWriterTest.test_constructor(self)
1673 # The allocation can succeed on 32-bit builds, e.g. with more
1674 # than 2GB RAM and a 64-bit kernel.
1675 if sys.maxsize > 0x7FFFFFFF:
1676 rawio = self.MockRawIO()
1677 bufio = self.tp(rawio)
1678 self.assertRaises((OverflowError, MemoryError, ValueError),
1679 bufio.__init__, rawio, sys.maxsize)
1680
1681 def test_initialization(self):
1682 rawio = self.MockRawIO()
1683 bufio = self.tp(rawio)
1684 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1685 self.assertRaises(ValueError, bufio.write, b"def")
1686 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1687 self.assertRaises(ValueError, bufio.write, b"def")
1688 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1689 self.assertRaises(ValueError, bufio.write, b"def")
1690
1691 def test_garbage_collection(self):
1692 # C BufferedWriter objects are collected, and collecting them flushes
1693 # all data to disk.
1694 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001695 with support.check_warnings(('', ResourceWarning)):
1696 rawio = self.FileIO(support.TESTFN, "w+b")
1697 f = self.tp(rawio)
1698 f.write(b"123xxx")
1699 f.x = f
1700 wr = weakref.ref(f)
1701 del f
1702 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001703 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001704 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001705 self.assertEqual(f.read(), b"123xxx")
1706
R David Murray67bfe802013-02-23 21:51:05 -05001707 def test_args_error(self):
1708 # Issue #17275
1709 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1710 self.tp(io.BytesIO(), 1024, 1024, 1024)
1711
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001712
1713class PyBufferedWriterTest(BufferedWriterTest):
1714 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001715
Guido van Rossum01a27522007-03-07 01:00:12 +00001716class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001717
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001718 def test_constructor(self):
1719 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001720 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001721
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001722 def test_uninitialized(self):
1723 pair = self.tp.__new__(self.tp)
1724 del pair
1725 pair = self.tp.__new__(self.tp)
1726 self.assertRaisesRegex((ValueError, AttributeError),
1727 'uninitialized|has no attribute',
1728 pair.read, 0)
1729 self.assertRaisesRegex((ValueError, AttributeError),
1730 'uninitialized|has no attribute',
1731 pair.write, b'')
1732 pair.__init__(self.MockRawIO(), self.MockRawIO())
1733 self.assertEqual(pair.read(0), b'')
1734 self.assertEqual(pair.write(b''), 0)
1735
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001736 def test_detach(self):
1737 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1738 self.assertRaises(self.UnsupportedOperation, pair.detach)
1739
Florent Xicluna109d5732012-07-07 17:03:22 +02001740 def test_constructor_max_buffer_size_removal(self):
1741 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001742 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001743
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001744 def test_constructor_with_not_readable(self):
1745 class NotReadable(MockRawIO):
1746 def readable(self):
1747 return False
1748
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001749 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001750
1751 def test_constructor_with_not_writeable(self):
1752 class NotWriteable(MockRawIO):
1753 def writable(self):
1754 return False
1755
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001756 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001757
1758 def test_read(self):
1759 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1760
1761 self.assertEqual(pair.read(3), b"abc")
1762 self.assertEqual(pair.read(1), b"d")
1763 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001764 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1765 self.assertEqual(pair.read(None), b"abc")
1766
1767 def test_readlines(self):
1768 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1769 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1770 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1771 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001772
1773 def test_read1(self):
1774 # .read1() is delegated to the underlying reader object, so this test
1775 # can be shallow.
1776 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1777
1778 self.assertEqual(pair.read1(3), b"abc")
1779
1780 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001781 for method in ("readinto", "readinto1"):
1782 with self.subTest(method):
1783 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001784
Martin Panter6bb91f32016-05-28 00:41:57 +00001785 data = byteslike(5)
1786 self.assertEqual(getattr(pair, method)(data), 5)
1787 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001788
1789 def test_write(self):
1790 w = self.MockRawIO()
1791 pair = self.tp(self.MockRawIO(), w)
1792
1793 pair.write(b"abc")
1794 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00001795 buffer = bytearray(b"def")
1796 pair.write(buffer)
1797 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001798 pair.flush()
1799 self.assertEqual(w._write_stack, [b"abc", b"def"])
1800
1801 def test_peek(self):
1802 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1803
1804 self.assertTrue(pair.peek(3).startswith(b"abc"))
1805 self.assertEqual(pair.read(3), b"abc")
1806
1807 def test_readable(self):
1808 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1809 self.assertTrue(pair.readable())
1810
1811 def test_writeable(self):
1812 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1813 self.assertTrue(pair.writable())
1814
1815 def test_seekable(self):
1816 # BufferedRWPairs are never seekable, even if their readers and writers
1817 # are.
1818 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1819 self.assertFalse(pair.seekable())
1820
1821 # .flush() is delegated to the underlying writer object and has been
1822 # tested in the test_write method.
1823
1824 def test_close_and_closed(self):
1825 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1826 self.assertFalse(pair.closed)
1827 pair.close()
1828 self.assertTrue(pair.closed)
1829
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001830 def test_reader_close_error_on_close(self):
1831 def reader_close():
1832 reader_non_existing
1833 reader = self.MockRawIO()
1834 reader.close = reader_close
1835 writer = self.MockRawIO()
1836 pair = self.tp(reader, writer)
1837 with self.assertRaises(NameError) as err:
1838 pair.close()
1839 self.assertIn('reader_non_existing', str(err.exception))
1840 self.assertTrue(pair.closed)
1841 self.assertFalse(reader.closed)
1842 self.assertTrue(writer.closed)
1843
1844 def test_writer_close_error_on_close(self):
1845 def writer_close():
1846 writer_non_existing
1847 reader = self.MockRawIO()
1848 writer = self.MockRawIO()
1849 writer.close = writer_close
1850 pair = self.tp(reader, writer)
1851 with self.assertRaises(NameError) as err:
1852 pair.close()
1853 self.assertIn('writer_non_existing', str(err.exception))
1854 self.assertFalse(pair.closed)
1855 self.assertTrue(reader.closed)
1856 self.assertFalse(writer.closed)
1857
1858 def test_reader_writer_close_error_on_close(self):
1859 def reader_close():
1860 reader_non_existing
1861 def writer_close():
1862 writer_non_existing
1863 reader = self.MockRawIO()
1864 reader.close = reader_close
1865 writer = self.MockRawIO()
1866 writer.close = writer_close
1867 pair = self.tp(reader, writer)
1868 with self.assertRaises(NameError) as err:
1869 pair.close()
1870 self.assertIn('reader_non_existing', str(err.exception))
1871 self.assertIsInstance(err.exception.__context__, NameError)
1872 self.assertIn('writer_non_existing', str(err.exception.__context__))
1873 self.assertFalse(pair.closed)
1874 self.assertFalse(reader.closed)
1875 self.assertFalse(writer.closed)
1876
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001877 def test_isatty(self):
1878 class SelectableIsAtty(MockRawIO):
1879 def __init__(self, isatty):
1880 MockRawIO.__init__(self)
1881 self._isatty = isatty
1882
1883 def isatty(self):
1884 return self._isatty
1885
1886 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1887 self.assertFalse(pair.isatty())
1888
1889 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1890 self.assertTrue(pair.isatty())
1891
1892 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1893 self.assertTrue(pair.isatty())
1894
1895 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1896 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001897
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001898 def test_weakref_clearing(self):
1899 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1900 ref = weakref.ref(brw)
1901 brw = None
1902 ref = None # Shouldn't segfault.
1903
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001904class CBufferedRWPairTest(BufferedRWPairTest):
1905 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001906
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001907class PyBufferedRWPairTest(BufferedRWPairTest):
1908 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001909
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001910
1911class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1912 read_mode = "rb+"
1913 write_mode = "wb+"
1914
1915 def test_constructor(self):
1916 BufferedReaderTest.test_constructor(self)
1917 BufferedWriterTest.test_constructor(self)
1918
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001919 def test_uninitialized(self):
1920 BufferedReaderTest.test_uninitialized(self)
1921 BufferedWriterTest.test_uninitialized(self)
1922
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001923 def test_read_and_write(self):
1924 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001925 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001926
1927 self.assertEqual(b"as", rw.read(2))
1928 rw.write(b"ddd")
1929 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001930 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001931 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001932 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001933
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001934 def test_seek_and_tell(self):
1935 raw = self.BytesIO(b"asdfghjkl")
1936 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001937
Ezio Melottib3aedd42010-11-20 19:04:17 +00001938 self.assertEqual(b"as", rw.read(2))
1939 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001940 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001941 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001942
Antoine Pitroue05565e2011-08-20 14:39:23 +02001943 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001944 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001945 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001946 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001947 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001948 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001949 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001950 self.assertEqual(7, rw.tell())
1951 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001952 rw.flush()
1953 self.assertEqual(b"asdf123fl", raw.getvalue())
1954
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001955 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001956
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001957 def check_flush_and_read(self, read_func):
1958 raw = self.BytesIO(b"abcdefghi")
1959 bufio = self.tp(raw)
1960
Ezio Melottib3aedd42010-11-20 19:04:17 +00001961 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001962 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001963 self.assertEqual(b"ef", read_func(bufio, 2))
1964 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001965 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001966 self.assertEqual(6, bufio.tell())
1967 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001968 raw.seek(0, 0)
1969 raw.write(b"XYZ")
1970 # flush() resets the read buffer
1971 bufio.flush()
1972 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001973 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001974
1975 def test_flush_and_read(self):
1976 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1977
1978 def test_flush_and_readinto(self):
1979 def _readinto(bufio, n=-1):
1980 b = bytearray(n if n >= 0 else 9999)
1981 n = bufio.readinto(b)
1982 return bytes(b[:n])
1983 self.check_flush_and_read(_readinto)
1984
1985 def test_flush_and_peek(self):
1986 def _peek(bufio, n=-1):
1987 # This relies on the fact that the buffer can contain the whole
1988 # raw stream, otherwise peek() can return less.
1989 b = bufio.peek(n)
1990 if n != -1:
1991 b = b[:n]
1992 bufio.seek(len(b), 1)
1993 return b
1994 self.check_flush_and_read(_peek)
1995
1996 def test_flush_and_write(self):
1997 raw = self.BytesIO(b"abcdefghi")
1998 bufio = self.tp(raw)
1999
2000 bufio.write(b"123")
2001 bufio.flush()
2002 bufio.write(b"45")
2003 bufio.flush()
2004 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002005 self.assertEqual(b"12345fghi", raw.getvalue())
2006 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002007
2008 def test_threads(self):
2009 BufferedReaderTest.test_threads(self)
2010 BufferedWriterTest.test_threads(self)
2011
2012 def test_writes_and_peek(self):
2013 def _peek(bufio):
2014 bufio.peek(1)
2015 self.check_writes(_peek)
2016 def _peek(bufio):
2017 pos = bufio.tell()
2018 bufio.seek(-1, 1)
2019 bufio.peek(1)
2020 bufio.seek(pos, 0)
2021 self.check_writes(_peek)
2022
2023 def test_writes_and_reads(self):
2024 def _read(bufio):
2025 bufio.seek(-1, 1)
2026 bufio.read(1)
2027 self.check_writes(_read)
2028
2029 def test_writes_and_read1s(self):
2030 def _read1(bufio):
2031 bufio.seek(-1, 1)
2032 bufio.read1(1)
2033 self.check_writes(_read1)
2034
2035 def test_writes_and_readintos(self):
2036 def _read(bufio):
2037 bufio.seek(-1, 1)
2038 bufio.readinto(bytearray(1))
2039 self.check_writes(_read)
2040
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002041 def test_write_after_readahead(self):
2042 # Issue #6629: writing after the buffer was filled by readahead should
2043 # first rewind the raw stream.
2044 for overwrite_size in [1, 5]:
2045 raw = self.BytesIO(b"A" * 10)
2046 bufio = self.tp(raw, 4)
2047 # Trigger readahead
2048 self.assertEqual(bufio.read(1), b"A")
2049 self.assertEqual(bufio.tell(), 1)
2050 # Overwriting should rewind the raw stream if it needs so
2051 bufio.write(b"B" * overwrite_size)
2052 self.assertEqual(bufio.tell(), overwrite_size + 1)
2053 # If the write size was smaller than the buffer size, flush() and
2054 # check that rewind happens.
2055 bufio.flush()
2056 self.assertEqual(bufio.tell(), overwrite_size + 1)
2057 s = raw.getvalue()
2058 self.assertEqual(s,
2059 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2060
Antoine Pitrou7c404892011-05-13 00:13:33 +02002061 def test_write_rewind_write(self):
2062 # Various combinations of reading / writing / seeking backwards / writing again
2063 def mutate(bufio, pos1, pos2):
2064 assert pos2 >= pos1
2065 # Fill the buffer
2066 bufio.seek(pos1)
2067 bufio.read(pos2 - pos1)
2068 bufio.write(b'\x02')
2069 # This writes earlier than the previous write, but still inside
2070 # the buffer.
2071 bufio.seek(pos1)
2072 bufio.write(b'\x01')
2073
2074 b = b"\x80\x81\x82\x83\x84"
2075 for i in range(0, len(b)):
2076 for j in range(i, len(b)):
2077 raw = self.BytesIO(b)
2078 bufio = self.tp(raw, 100)
2079 mutate(bufio, i, j)
2080 bufio.flush()
2081 expected = bytearray(b)
2082 expected[j] = 2
2083 expected[i] = 1
2084 self.assertEqual(raw.getvalue(), expected,
2085 "failed result for i=%d, j=%d" % (i, j))
2086
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002087 def test_truncate_after_read_or_write(self):
2088 raw = self.BytesIO(b"A" * 10)
2089 bufio = self.tp(raw, 100)
2090 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2091 self.assertEqual(bufio.truncate(), 2)
2092 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2093 self.assertEqual(bufio.truncate(), 4)
2094
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002095 def test_misbehaved_io(self):
2096 BufferedReaderTest.test_misbehaved_io(self)
2097 BufferedWriterTest.test_misbehaved_io(self)
2098
Antoine Pitroue05565e2011-08-20 14:39:23 +02002099 def test_interleaved_read_write(self):
2100 # Test for issue #12213
2101 with self.BytesIO(b'abcdefgh') as raw:
2102 with self.tp(raw, 100) as f:
2103 f.write(b"1")
2104 self.assertEqual(f.read(1), b'b')
2105 f.write(b'2')
2106 self.assertEqual(f.read1(1), b'd')
2107 f.write(b'3')
2108 buf = bytearray(1)
2109 f.readinto(buf)
2110 self.assertEqual(buf, b'f')
2111 f.write(b'4')
2112 self.assertEqual(f.peek(1), b'h')
2113 f.flush()
2114 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2115
2116 with self.BytesIO(b'abc') as raw:
2117 with self.tp(raw, 100) as f:
2118 self.assertEqual(f.read(1), b'a')
2119 f.write(b"2")
2120 self.assertEqual(f.read(1), b'c')
2121 f.flush()
2122 self.assertEqual(raw.getvalue(), b'a2c')
2123
2124 def test_interleaved_readline_write(self):
2125 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2126 with self.tp(raw) as f:
2127 f.write(b'1')
2128 self.assertEqual(f.readline(), b'b\n')
2129 f.write(b'2')
2130 self.assertEqual(f.readline(), b'def\n')
2131 f.write(b'3')
2132 self.assertEqual(f.readline(), b'\n')
2133 f.flush()
2134 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2135
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002136 # You can't construct a BufferedRandom over a non-seekable stream.
2137 test_unseekable = None
2138
R David Murray67bfe802013-02-23 21:51:05 -05002139
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002140class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002141 tp = io.BufferedRandom
2142
2143 def test_constructor(self):
2144 BufferedRandomTest.test_constructor(self)
2145 # The allocation can succeed on 32-bit builds, e.g. with more
2146 # than 2GB RAM and a 64-bit kernel.
2147 if sys.maxsize > 0x7FFFFFFF:
2148 rawio = self.MockRawIO()
2149 bufio = self.tp(rawio)
2150 self.assertRaises((OverflowError, MemoryError, ValueError),
2151 bufio.__init__, rawio, sys.maxsize)
2152
2153 def test_garbage_collection(self):
2154 CBufferedReaderTest.test_garbage_collection(self)
2155 CBufferedWriterTest.test_garbage_collection(self)
2156
R David Murray67bfe802013-02-23 21:51:05 -05002157 def test_args_error(self):
2158 # Issue #17275
2159 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2160 self.tp(io.BytesIO(), 1024, 1024, 1024)
2161
2162
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002163class PyBufferedRandomTest(BufferedRandomTest):
2164 tp = pyio.BufferedRandom
2165
2166
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002167# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2168# properties:
2169# - A single output character can correspond to many bytes of input.
2170# - The number of input bytes to complete the character can be
2171# undetermined until the last input byte is received.
2172# - The number of input bytes can vary depending on previous input.
2173# - A single input byte can correspond to many characters of output.
2174# - The number of output characters can be undetermined until the
2175# last input byte is received.
2176# - The number of output characters can vary depending on previous input.
2177
2178class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2179 """
2180 For testing seek/tell behavior with a stateful, buffering decoder.
2181
2182 Input is a sequence of words. Words may be fixed-length (length set
2183 by input) or variable-length (period-terminated). In variable-length
2184 mode, extra periods are ignored. Possible words are:
2185 - 'i' followed by a number sets the input length, I (maximum 99).
2186 When I is set to 0, words are space-terminated.
2187 - 'o' followed by a number sets the output length, O (maximum 99).
2188 - Any other word is converted into a word followed by a period on
2189 the output. The output word consists of the input word truncated
2190 or padded out with hyphens to make its length equal to O. If O
2191 is 0, the word is output verbatim without truncating or padding.
2192 I and O are initially set to 1. When I changes, any buffered input is
2193 re-scanned according to the new I. EOF also terminates the last word.
2194 """
2195
2196 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002197 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002198 self.reset()
2199
2200 def __repr__(self):
2201 return '<SID %x>' % id(self)
2202
2203 def reset(self):
2204 self.i = 1
2205 self.o = 1
2206 self.buffer = bytearray()
2207
2208 def getstate(self):
2209 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2210 return bytes(self.buffer), i*100 + o
2211
2212 def setstate(self, state):
2213 buffer, io = state
2214 self.buffer = bytearray(buffer)
2215 i, o = divmod(io, 100)
2216 self.i, self.o = i ^ 1, o ^ 1
2217
2218 def decode(self, input, final=False):
2219 output = ''
2220 for b in input:
2221 if self.i == 0: # variable-length, terminated with period
2222 if b == ord('.'):
2223 if self.buffer:
2224 output += self.process_word()
2225 else:
2226 self.buffer.append(b)
2227 else: # fixed-length, terminate after self.i bytes
2228 self.buffer.append(b)
2229 if len(self.buffer) == self.i:
2230 output += self.process_word()
2231 if final and self.buffer: # EOF terminates the last word
2232 output += self.process_word()
2233 return output
2234
2235 def process_word(self):
2236 output = ''
2237 if self.buffer[0] == ord('i'):
2238 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2239 elif self.buffer[0] == ord('o'):
2240 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2241 else:
2242 output = self.buffer.decode('ascii')
2243 if len(output) < self.o:
2244 output += '-'*self.o # pad out with hyphens
2245 if self.o:
2246 output = output[:self.o] # truncate to output length
2247 output += '.'
2248 self.buffer = bytearray()
2249 return output
2250
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002251 codecEnabled = False
2252
2253 @classmethod
2254 def lookupTestDecoder(cls, name):
2255 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002256 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002257 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002258 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002259 incrementalencoder=None,
2260 streamreader=None, streamwriter=None,
2261 incrementaldecoder=cls)
2262
2263# Register the previous decoder for testing.
2264# Disabled by default, tests will enable it.
2265codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2266
2267
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002268class StatefulIncrementalDecoderTest(unittest.TestCase):
2269 """
2270 Make sure the StatefulIncrementalDecoder actually works.
2271 """
2272
2273 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002274 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002275 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002276 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002277 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002278 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002279 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002280 # I=0, O=6 (variable-length input, fixed-length output)
2281 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2282 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002283 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002284 # I=6, O=3 (fixed-length input > fixed-length output)
2285 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2286 # I=0, then 3; O=29, then 15 (with longer output)
2287 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2288 'a----------------------------.' +
2289 'b----------------------------.' +
2290 'cde--------------------------.' +
2291 'abcdefghijabcde.' +
2292 'a.b------------.' +
2293 '.c.------------.' +
2294 'd.e------------.' +
2295 'k--------------.' +
2296 'l--------------.' +
2297 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002298 ]
2299
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002300 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002301 # Try a few one-shot test cases.
2302 for input, eof, output in self.test_cases:
2303 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002304 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002305
2306 # Also test an unfinished decode, followed by forcing EOF.
2307 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002308 self.assertEqual(d.decode(b'oiabcd'), '')
2309 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002310
2311class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002312
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002313 def setUp(self):
2314 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2315 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002316 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002317
Guido van Rossumd0712812007-04-11 16:32:43 +00002318 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002319 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002320
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002321 def test_constructor(self):
2322 r = self.BytesIO(b"\xc3\xa9\n\n")
2323 b = self.BufferedReader(r, 1000)
2324 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002325 t.__init__(b, encoding="latin-1", newline="\r\n")
2326 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002327 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002328 t.__init__(b, encoding="utf-8", line_buffering=True)
2329 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002330 self.assertEqual(t.line_buffering, True)
2331 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002332 self.assertRaises(TypeError, t.__init__, b, newline=42)
2333 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2334
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002335 def test_uninitialized(self):
2336 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2337 del t
2338 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2339 self.assertRaises(Exception, repr, t)
2340 self.assertRaisesRegex((ValueError, AttributeError),
2341 'uninitialized|has no attribute',
2342 t.read, 0)
2343 t.__init__(self.MockRawIO())
2344 self.assertEqual(t.read(0), '')
2345
Nick Coghlana9b15242014-02-04 22:11:18 +10002346 def test_non_text_encoding_codecs_are_rejected(self):
2347 # Ensure the constructor complains if passed a codec that isn't
2348 # marked as a text encoding
2349 # http://bugs.python.org/issue20404
2350 r = self.BytesIO()
2351 b = self.BufferedWriter(r)
2352 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2353 self.TextIOWrapper(b, encoding="hex")
2354
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002355 def test_detach(self):
2356 r = self.BytesIO()
2357 b = self.BufferedWriter(r)
2358 t = self.TextIOWrapper(b)
2359 self.assertIs(t.detach(), b)
2360
2361 t = self.TextIOWrapper(b, encoding="ascii")
2362 t.write("howdy")
2363 self.assertFalse(r.getvalue())
2364 t.detach()
2365 self.assertEqual(r.getvalue(), b"howdy")
2366 self.assertRaises(ValueError, t.detach)
2367
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002368 # Operations independent of the detached stream should still work
2369 repr(t)
2370 self.assertEqual(t.encoding, "ascii")
2371 self.assertEqual(t.errors, "strict")
2372 self.assertFalse(t.line_buffering)
2373
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002374 def test_repr(self):
2375 raw = self.BytesIO("hello".encode("utf-8"))
2376 b = self.BufferedReader(raw)
2377 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002378 modname = self.TextIOWrapper.__module__
2379 self.assertEqual(repr(t),
2380 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2381 raw.name = "dummy"
2382 self.assertEqual(repr(t),
2383 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002384 t.mode = "r"
2385 self.assertEqual(repr(t),
2386 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002387 raw.name = b"dummy"
2388 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002389 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002390
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002391 t.buffer.detach()
2392 repr(t) # Should not raise an exception
2393
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002394 def test_line_buffering(self):
2395 r = self.BytesIO()
2396 b = self.BufferedWriter(r, 1000)
2397 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002398 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002399 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002400 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002401 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002402 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002403 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002404
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002405 def test_default_encoding(self):
2406 old_environ = dict(os.environ)
2407 try:
2408 # try to get a user preferred encoding different than the current
2409 # locale encoding to check that TextIOWrapper() uses the current
2410 # locale encoding and not the user preferred encoding
2411 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2412 if key in os.environ:
2413 del os.environ[key]
2414
2415 current_locale_encoding = locale.getpreferredencoding(False)
2416 b = self.BytesIO()
2417 t = self.TextIOWrapper(b)
2418 self.assertEqual(t.encoding, current_locale_encoding)
2419 finally:
2420 os.environ.clear()
2421 os.environ.update(old_environ)
2422
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002423 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002424 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002425 # Issue 15989
2426 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002427 b = self.BytesIO()
2428 b.fileno = lambda: _testcapi.INT_MAX + 1
2429 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2430 b.fileno = lambda: _testcapi.UINT_MAX + 1
2431 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2432
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002433 def test_encoding(self):
2434 # Check the encoding attribute is always set, and valid
2435 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002436 t = self.TextIOWrapper(b, encoding="utf-8")
2437 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002438 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002439 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002440 codecs.lookup(t.encoding)
2441
2442 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002443 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002444 b = self.BytesIO(b"abc\n\xff\n")
2445 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002446 self.assertRaises(UnicodeError, t.read)
2447 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002448 b = self.BytesIO(b"abc\n\xff\n")
2449 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002450 self.assertRaises(UnicodeError, t.read)
2451 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002452 b = self.BytesIO(b"abc\n\xff\n")
2453 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002454 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002455 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002456 b = self.BytesIO(b"abc\n\xff\n")
2457 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002458 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002459
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002460 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002461 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002462 b = self.BytesIO()
2463 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002464 self.assertRaises(UnicodeError, t.write, "\xff")
2465 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002466 b = self.BytesIO()
2467 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002468 self.assertRaises(UnicodeError, t.write, "\xff")
2469 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002470 b = self.BytesIO()
2471 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002472 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002473 t.write("abc\xffdef\n")
2474 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002475 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002476 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002477 b = self.BytesIO()
2478 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002479 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002480 t.write("abc\xffdef\n")
2481 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002482 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002483
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002484 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002485 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2486
2487 tests = [
2488 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002489 [ '', input_lines ],
2490 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2491 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2492 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002493 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002494 encodings = (
2495 'utf-8', 'latin-1',
2496 'utf-16', 'utf-16-le', 'utf-16-be',
2497 'utf-32', 'utf-32-le', 'utf-32-be',
2498 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002499
Guido van Rossum8358db22007-08-18 21:39:55 +00002500 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002501 # character in TextIOWrapper._pending_line.
2502 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002503 # XXX: str.encode() should return bytes
2504 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002505 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002506 for bufsize in range(1, 10):
2507 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002508 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2509 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002510 encoding=encoding)
2511 if do_reads:
2512 got_lines = []
2513 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002514 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002515 if c2 == '':
2516 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002517 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002518 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002519 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002520 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002521
2522 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002523 self.assertEqual(got_line, exp_line)
2524 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002525
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002526 def test_newlines_input(self):
2527 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002528 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2529 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002530 (None, normalized.decode("ascii").splitlines(keepends=True)),
2531 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002532 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2533 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2534 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002535 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002536 buf = self.BytesIO(testdata)
2537 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002538 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002539 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002540 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002541
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002542 def test_newlines_output(self):
2543 testdict = {
2544 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2545 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2546 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2547 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2548 }
2549 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2550 for newline, expected in tests:
2551 buf = self.BytesIO()
2552 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2553 txt.write("AAA\nB")
2554 txt.write("BB\nCCC\n")
2555 txt.write("X\rY\r\nZ")
2556 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002557 self.assertEqual(buf.closed, False)
2558 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002559
2560 def test_destructor(self):
2561 l = []
2562 base = self.BytesIO
2563 class MyBytesIO(base):
2564 def close(self):
2565 l.append(self.getvalue())
2566 base.close(self)
2567 b = MyBytesIO()
2568 t = self.TextIOWrapper(b, encoding="ascii")
2569 t.write("abc")
2570 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002571 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002572 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002573
2574 def test_override_destructor(self):
2575 record = []
2576 class MyTextIO(self.TextIOWrapper):
2577 def __del__(self):
2578 record.append(1)
2579 try:
2580 f = super().__del__
2581 except AttributeError:
2582 pass
2583 else:
2584 f()
2585 def close(self):
2586 record.append(2)
2587 super().close()
2588 def flush(self):
2589 record.append(3)
2590 super().flush()
2591 b = self.BytesIO()
2592 t = MyTextIO(b, encoding="ascii")
2593 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002594 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002595 self.assertEqual(record, [1, 2, 3])
2596
2597 def test_error_through_destructor(self):
2598 # Test that the exception state is not modified by a destructor,
2599 # even if close() fails.
2600 rawio = self.CloseFailureIO()
2601 def f():
2602 self.TextIOWrapper(rawio).xyzzy
2603 with support.captured_output("stderr") as s:
2604 self.assertRaises(AttributeError, f)
2605 s = s.getvalue().strip()
2606 if s:
2607 # The destructor *may* have printed an unraisable error, check it
2608 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002609 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002610 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002611
Guido van Rossum9b76da62007-04-11 01:09:03 +00002612 # Systematic tests of the text I/O API
2613
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002614 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002615 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002616 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002617 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002618 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002619 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002620 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002621 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002622 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002623 self.assertEqual(f.tell(), 0)
2624 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002625 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002626 self.assertEqual(f.seek(0), 0)
2627 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002628 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002629 self.assertEqual(f.read(2), "ab")
2630 self.assertEqual(f.read(1), "c")
2631 self.assertEqual(f.read(1), "")
2632 self.assertEqual(f.read(), "")
2633 self.assertEqual(f.tell(), cookie)
2634 self.assertEqual(f.seek(0), 0)
2635 self.assertEqual(f.seek(0, 2), cookie)
2636 self.assertEqual(f.write("def"), 3)
2637 self.assertEqual(f.seek(cookie), cookie)
2638 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002639 if enc.startswith("utf"):
2640 self.multi_line_test(f, enc)
2641 f.close()
2642
2643 def multi_line_test(self, f, enc):
2644 f.seek(0)
2645 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002646 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002647 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002648 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002649 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002650 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002651 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002652 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002653 wlines.append((f.tell(), line))
2654 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002655 f.seek(0)
2656 rlines = []
2657 while True:
2658 pos = f.tell()
2659 line = f.readline()
2660 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002661 break
2662 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002663 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002664
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002665 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002666 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002667 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002668 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002669 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002670 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002671 p2 = f.tell()
2672 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002673 self.assertEqual(f.tell(), p0)
2674 self.assertEqual(f.readline(), "\xff\n")
2675 self.assertEqual(f.tell(), p1)
2676 self.assertEqual(f.readline(), "\xff\n")
2677 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002678 f.seek(0)
2679 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002680 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002681 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002682 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002683 f.close()
2684
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002685 def test_seeking(self):
2686 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002687 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002688 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002689 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002690 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002691 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002692 suffix = bytes(u_suffix.encode("utf-8"))
2693 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002694 with self.open(support.TESTFN, "wb") as f:
2695 f.write(line*2)
2696 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2697 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002698 self.assertEqual(s, str(prefix, "ascii"))
2699 self.assertEqual(f.tell(), prefix_size)
2700 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002701
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002702 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002703 # Regression test for a specific bug
2704 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002705 with self.open(support.TESTFN, "wb") as f:
2706 f.write(data)
2707 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2708 f._CHUNK_SIZE # Just test that it exists
2709 f._CHUNK_SIZE = 2
2710 f.readline()
2711 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002712
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002713 def test_seek_and_tell(self):
2714 #Test seek/tell using the StatefulIncrementalDecoder.
2715 # Make test faster by doing smaller seeks
2716 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002717
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002718 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002719 """Tell/seek to various points within a data stream and ensure
2720 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002721 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002722 f.write(data)
2723 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002724 f = self.open(support.TESTFN, encoding='test_decoder')
2725 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002726 decoded = f.read()
2727 f.close()
2728
Neal Norwitze2b07052008-03-18 19:52:05 +00002729 for i in range(min_pos, len(decoded) + 1): # seek positions
2730 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002731 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002732 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002733 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002734 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002735 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002736 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002737 f.close()
2738
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002739 # Enable the test decoder.
2740 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002741
2742 # Run the tests.
2743 try:
2744 # Try each test case.
2745 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002746 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002747
2748 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002749 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2750 offset = CHUNK_SIZE - len(input)//2
2751 prefix = b'.'*offset
2752 # Don't bother seeking into the prefix (takes too long).
2753 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002754 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002755
2756 # Ensure our test decoder won't interfere with subsequent tests.
2757 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002758 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002759
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002760 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002761 data = "1234567890"
2762 tests = ("utf-16",
2763 "utf-16-le",
2764 "utf-16-be",
2765 "utf-32",
2766 "utf-32-le",
2767 "utf-32-be")
2768 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002769 buf = self.BytesIO()
2770 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002771 # Check if the BOM is written only once (see issue1753).
2772 f.write(data)
2773 f.write(data)
2774 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002775 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002776 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002777 self.assertEqual(f.read(), data * 2)
2778 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002779
Benjamin Petersona1b49012009-03-31 23:11:32 +00002780 def test_unreadable(self):
2781 class UnReadable(self.BytesIO):
2782 def readable(self):
2783 return False
2784 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002785 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002786
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002787 def test_read_one_by_one(self):
2788 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002789 reads = ""
2790 while True:
2791 c = txt.read(1)
2792 if not c:
2793 break
2794 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002795 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002796
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002797 def test_readlines(self):
2798 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2799 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2800 txt.seek(0)
2801 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2802 txt.seek(0)
2803 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2804
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002805 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002806 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002807 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002808 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002809 reads = ""
2810 while True:
2811 c = txt.read(128)
2812 if not c:
2813 break
2814 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002815 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002816
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002817 def test_writelines(self):
2818 l = ['ab', 'cd', 'ef']
2819 buf = self.BytesIO()
2820 txt = self.TextIOWrapper(buf)
2821 txt.writelines(l)
2822 txt.flush()
2823 self.assertEqual(buf.getvalue(), b'abcdef')
2824
2825 def test_writelines_userlist(self):
2826 l = UserList(['ab', 'cd', 'ef'])
2827 buf = self.BytesIO()
2828 txt = self.TextIOWrapper(buf)
2829 txt.writelines(l)
2830 txt.flush()
2831 self.assertEqual(buf.getvalue(), b'abcdef')
2832
2833 def test_writelines_error(self):
2834 txt = self.TextIOWrapper(self.BytesIO())
2835 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2836 self.assertRaises(TypeError, txt.writelines, None)
2837 self.assertRaises(TypeError, txt.writelines, b'abc')
2838
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002839 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002840 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002841
2842 # read one char at a time
2843 reads = ""
2844 while True:
2845 c = txt.read(1)
2846 if not c:
2847 break
2848 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002849 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002850
2851 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002852 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002853 txt._CHUNK_SIZE = 4
2854
2855 reads = ""
2856 while True:
2857 c = txt.read(4)
2858 if not c:
2859 break
2860 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002861 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002862
2863 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002864 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002865 txt._CHUNK_SIZE = 4
2866
2867 reads = txt.read(4)
2868 reads += txt.read(4)
2869 reads += txt.readline()
2870 reads += txt.readline()
2871 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002872 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002873
2874 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002875 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002876 txt._CHUNK_SIZE = 4
2877
2878 reads = txt.read(4)
2879 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002880 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002881
2882 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002883 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002884 txt._CHUNK_SIZE = 4
2885
2886 reads = txt.read(4)
2887 pos = txt.tell()
2888 txt.seek(0)
2889 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002890 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002891
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002892 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002893 buffer = self.BytesIO(self.testdata)
2894 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002895
2896 self.assertEqual(buffer.seekable(), txt.seekable())
2897
Antoine Pitroue4501852009-05-14 18:55:55 +00002898 def test_append_bom(self):
2899 # The BOM is not written again when appending to a non-empty file
2900 filename = support.TESTFN
2901 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2902 with self.open(filename, 'w', encoding=charset) as f:
2903 f.write('aaa')
2904 pos = f.tell()
2905 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002906 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002907
2908 with self.open(filename, 'a', encoding=charset) as f:
2909 f.write('xxx')
2910 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002911 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002912
2913 def test_seek_bom(self):
2914 # Same test, but when seeking manually
2915 filename = support.TESTFN
2916 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2917 with self.open(filename, 'w', encoding=charset) as f:
2918 f.write('aaa')
2919 pos = f.tell()
2920 with self.open(filename, 'r+', encoding=charset) as f:
2921 f.seek(pos)
2922 f.write('zzz')
2923 f.seek(0)
2924 f.write('bbb')
2925 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002926 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002927
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02002928 def test_seek_append_bom(self):
2929 # Same test, but first seek to the start and then to the end
2930 filename = support.TESTFN
2931 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2932 with self.open(filename, 'w', encoding=charset) as f:
2933 f.write('aaa')
2934 with self.open(filename, 'a', encoding=charset) as f:
2935 f.seek(0)
2936 f.seek(0, self.SEEK_END)
2937 f.write('xxx')
2938 with self.open(filename, 'rb') as f:
2939 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
2940
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002941 def test_errors_property(self):
2942 with self.open(support.TESTFN, "w") as f:
2943 self.assertEqual(f.errors, "strict")
2944 with self.open(support.TESTFN, "w", errors="replace") as f:
2945 self.assertEqual(f.errors, "replace")
2946
Brett Cannon31f59292011-02-21 19:29:56 +00002947 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002948 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002949 def test_threads_write(self):
2950 # Issue6750: concurrent writes could duplicate data
2951 event = threading.Event()
2952 with self.open(support.TESTFN, "w", buffering=1) as f:
2953 def run(n):
2954 text = "Thread%03d\n" % n
2955 event.wait()
2956 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002957 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002958 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002959 with support.start_threads(threads, event.set):
2960 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002961 with self.open(support.TESTFN) as f:
2962 content = f.read()
2963 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002964 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002965
Antoine Pitrou6be88762010-05-03 16:48:20 +00002966 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002967 # Test that text file is closed despite failed flush
2968 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00002969 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002970 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00002971 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002972 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002973 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002974 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002975 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002976 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002977 self.assertTrue(txt.buffer.closed)
2978 self.assertTrue(closed) # flush() called
2979 self.assertFalse(closed[0]) # flush() called before file closed
2980 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02002981 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00002982
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03002983 def test_close_error_on_close(self):
2984 buffer = self.BytesIO(self.testdata)
2985 def bad_flush():
2986 raise OSError('flush')
2987 def bad_close():
2988 raise OSError('close')
2989 buffer.close = bad_close
2990 txt = self.TextIOWrapper(buffer, encoding="ascii")
2991 txt.flush = bad_flush
2992 with self.assertRaises(OSError) as err: # exception not swallowed
2993 txt.close()
2994 self.assertEqual(err.exception.args, ('close',))
2995 self.assertIsInstance(err.exception.__context__, OSError)
2996 self.assertEqual(err.exception.__context__.args, ('flush',))
2997 self.assertFalse(txt.closed)
2998
2999 def test_nonnormalized_close_error_on_close(self):
3000 # Issue #21677
3001 buffer = self.BytesIO(self.testdata)
3002 def bad_flush():
3003 raise non_existing_flush
3004 def bad_close():
3005 raise non_existing_close
3006 buffer.close = bad_close
3007 txt = self.TextIOWrapper(buffer, encoding="ascii")
3008 txt.flush = bad_flush
3009 with self.assertRaises(NameError) as err: # exception not swallowed
3010 txt.close()
3011 self.assertIn('non_existing_close', str(err.exception))
3012 self.assertIsInstance(err.exception.__context__, NameError)
3013 self.assertIn('non_existing_flush', str(err.exception.__context__))
3014 self.assertFalse(txt.closed)
3015
Antoine Pitrou6be88762010-05-03 16:48:20 +00003016 def test_multi_close(self):
3017 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3018 txt.close()
3019 txt.close()
3020 txt.close()
3021 self.assertRaises(ValueError, txt.flush)
3022
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003023 def test_unseekable(self):
3024 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3025 self.assertRaises(self.UnsupportedOperation, txt.tell)
3026 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3027
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003028 def test_readonly_attributes(self):
3029 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3030 buf = self.BytesIO(self.testdata)
3031 with self.assertRaises(AttributeError):
3032 txt.buffer = buf
3033
Antoine Pitroue96ec682011-07-23 21:46:35 +02003034 def test_rawio(self):
3035 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3036 # that subprocess.Popen() can have the required unbuffered
3037 # semantics with universal_newlines=True.
3038 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3039 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3040 # Reads
3041 self.assertEqual(txt.read(4), 'abcd')
3042 self.assertEqual(txt.readline(), 'efghi\n')
3043 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3044
3045 def test_rawio_write_through(self):
3046 # Issue #12591: with write_through=True, writes don't need a flush
3047 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3048 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3049 write_through=True)
3050 txt.write('1')
3051 txt.write('23\n4')
3052 txt.write('5')
3053 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3054
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003055 def test_bufio_write_through(self):
3056 # Issue #21396: write_through=True doesn't force a flush()
3057 # on the underlying binary buffered object.
3058 flush_called, write_called = [], []
3059 class BufferedWriter(self.BufferedWriter):
3060 def flush(self, *args, **kwargs):
3061 flush_called.append(True)
3062 return super().flush(*args, **kwargs)
3063 def write(self, *args, **kwargs):
3064 write_called.append(True)
3065 return super().write(*args, **kwargs)
3066
3067 rawio = self.BytesIO()
3068 data = b"a"
3069 bufio = BufferedWriter(rawio, len(data)*2)
3070 textio = self.TextIOWrapper(bufio, encoding='ascii',
3071 write_through=True)
3072 # write to the buffered io but don't overflow the buffer
3073 text = data.decode('ascii')
3074 textio.write(text)
3075
3076 # buffer.flush is not called with write_through=True
3077 self.assertFalse(flush_called)
3078 # buffer.write *is* called with write_through=True
3079 self.assertTrue(write_called)
3080 self.assertEqual(rawio.getvalue(), b"") # no flush
3081
3082 write_called = [] # reset
3083 textio.write(text * 10) # total content is larger than bufio buffer
3084 self.assertTrue(write_called)
3085 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3086
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003087 def test_read_nonbytes(self):
3088 # Issue #17106
3089 # Crash when underlying read() returns non-bytes
3090 t = self.TextIOWrapper(self.StringIO('a'))
3091 self.assertRaises(TypeError, t.read, 1)
3092 t = self.TextIOWrapper(self.StringIO('a'))
3093 self.assertRaises(TypeError, t.readline)
3094 t = self.TextIOWrapper(self.StringIO('a'))
3095 self.assertRaises(TypeError, t.read)
3096
3097 def test_illegal_decoder(self):
3098 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003099 # Bypass the early encoding check added in issue 20404
3100 def _make_illegal_wrapper():
3101 quopri = codecs.lookup("quopri")
3102 quopri._is_text_encoding = True
3103 try:
3104 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3105 newline='\n', encoding="quopri")
3106 finally:
3107 quopri._is_text_encoding = False
3108 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003109 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003110 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003111 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003112 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003113 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003114 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003115 self.assertRaises(TypeError, t.read)
3116
Antoine Pitrou712cb732013-12-21 15:51:54 +01003117 def _check_create_at_shutdown(self, **kwargs):
3118 # Issue #20037: creating a TextIOWrapper at shutdown
3119 # shouldn't crash the interpreter.
3120 iomod = self.io.__name__
3121 code = """if 1:
3122 import codecs
3123 import {iomod} as io
3124
3125 # Avoid looking up codecs at shutdown
3126 codecs.lookup('utf-8')
3127
3128 class C:
3129 def __init__(self):
3130 self.buf = io.BytesIO()
3131 def __del__(self):
3132 io.TextIOWrapper(self.buf, **{kwargs})
3133 print("ok")
3134 c = C()
3135 """.format(iomod=iomod, kwargs=kwargs)
3136 return assert_python_ok("-c", code)
3137
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003138 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003139 def test_create_at_shutdown_without_encoding(self):
3140 rc, out, err = self._check_create_at_shutdown()
3141 if err:
3142 # Can error out with a RuntimeError if the module state
3143 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003144 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003145 else:
3146 self.assertEqual("ok", out.decode().strip())
3147
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003148 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003149 def test_create_at_shutdown_with_encoding(self):
3150 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3151 errors='strict')
3152 self.assertFalse(err)
3153 self.assertEqual("ok", out.decode().strip())
3154
Antoine Pitroub8503892014-04-29 10:14:02 +02003155 def test_read_byteslike(self):
3156 r = MemviewBytesIO(b'Just some random string\n')
3157 t = self.TextIOWrapper(r, 'utf-8')
3158
3159 # TextIOwrapper will not read the full string, because
3160 # we truncate it to a multiple of the native int size
3161 # so that we can construct a more complex memoryview.
3162 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3163
3164 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3165
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003166 def test_issue22849(self):
3167 class F(object):
3168 def readable(self): return True
3169 def writable(self): return True
3170 def seekable(self): return True
3171
3172 for i in range(10):
3173 try:
3174 self.TextIOWrapper(F(), encoding='utf-8')
3175 except Exception:
3176 pass
3177
3178 F.tell = lambda x: 0
3179 t = self.TextIOWrapper(F(), encoding='utf-8')
3180
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003181
Antoine Pitroub8503892014-04-29 10:14:02 +02003182class MemviewBytesIO(io.BytesIO):
3183 '''A BytesIO object whose read method returns memoryviews
3184 rather than bytes'''
3185
3186 def read1(self, len_):
3187 return _to_memoryview(super().read1(len_))
3188
3189 def read(self, len_):
3190 return _to_memoryview(super().read(len_))
3191
3192def _to_memoryview(buf):
3193 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3194
3195 arr = array.array('i')
3196 idx = len(buf) - len(buf) % arr.itemsize
3197 arr.frombytes(buf[:idx])
3198 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003199
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003200
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003201class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003202 io = io
Nick Coghlana9b15242014-02-04 22:11:18 +10003203 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003204
3205 def test_initialization(self):
3206 r = self.BytesIO(b"\xc3\xa9\n\n")
3207 b = self.BufferedReader(r, 1000)
3208 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003209 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3210 self.assertRaises(ValueError, t.read)
3211
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003212 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3213 self.assertRaises(Exception, repr, t)
3214
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003215 def test_garbage_collection(self):
3216 # C TextIOWrapper objects are collected, and collecting them flushes
3217 # all data to disk.
3218 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003219 with support.check_warnings(('', ResourceWarning)):
3220 rawio = io.FileIO(support.TESTFN, "wb")
3221 b = self.BufferedWriter(rawio)
3222 t = self.TextIOWrapper(b, encoding="ascii")
3223 t.write("456def")
3224 t.x = t
3225 wr = weakref.ref(t)
3226 del t
3227 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003228 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003229 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003230 self.assertEqual(f.read(), b"456def")
3231
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003232 def test_rwpair_cleared_before_textio(self):
3233 # Issue 13070: TextIOWrapper's finalization would crash when called
3234 # after the reference to the underlying BufferedRWPair's writer got
3235 # cleared by the GC.
3236 for i in range(1000):
3237 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3238 t1 = self.TextIOWrapper(b1, encoding="ascii")
3239 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3240 t2 = self.TextIOWrapper(b2, encoding="ascii")
3241 # circular references
3242 t1.buddy = t2
3243 t2.buddy = t1
3244 support.gc_collect()
3245
3246
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003247class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003248 io = pyio
Serhiy Storchakad667d722014-02-10 19:09:19 +02003249 #shutdown_error = "LookupError: unknown encoding: ascii"
3250 shutdown_error = "TypeError: 'NoneType' object is not iterable"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003251
3252
3253class IncrementalNewlineDecoderTest(unittest.TestCase):
3254
3255 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003256 # UTF-8 specific tests for a newline decoder
3257 def _check_decode(b, s, **kwargs):
3258 # We exercise getstate() / setstate() as well as decode()
3259 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003260 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003261 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003262 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003263
Antoine Pitrou180a3362008-12-14 16:36:46 +00003264 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003265
Antoine Pitrou180a3362008-12-14 16:36:46 +00003266 _check_decode(b'\xe8', "")
3267 _check_decode(b'\xa2', "")
3268 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003269
Antoine Pitrou180a3362008-12-14 16:36:46 +00003270 _check_decode(b'\xe8', "")
3271 _check_decode(b'\xa2', "")
3272 _check_decode(b'\x88', "\u8888")
3273
3274 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003275 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3276
Antoine Pitrou180a3362008-12-14 16:36:46 +00003277 decoder.reset()
3278 _check_decode(b'\n', "\n")
3279 _check_decode(b'\r', "")
3280 _check_decode(b'', "\n", final=True)
3281 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003282
Antoine Pitrou180a3362008-12-14 16:36:46 +00003283 _check_decode(b'\r', "")
3284 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003285
Antoine Pitrou180a3362008-12-14 16:36:46 +00003286 _check_decode(b'\r\r\n', "\n\n")
3287 _check_decode(b'\r', "")
3288 _check_decode(b'\r', "\n")
3289 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003290
Antoine Pitrou180a3362008-12-14 16:36:46 +00003291 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3292 _check_decode(b'\xe8\xa2\x88', "\u8888")
3293 _check_decode(b'\n', "\n")
3294 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3295 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003296
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003297 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003298 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003299 if encoding is not None:
3300 encoder = codecs.getincrementalencoder(encoding)()
3301 def _decode_bytewise(s):
3302 # Decode one byte at a time
3303 for b in encoder.encode(s):
3304 result.append(decoder.decode(bytes([b])))
3305 else:
3306 encoder = None
3307 def _decode_bytewise(s):
3308 # Decode one char at a time
3309 for c in s:
3310 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003311 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003312 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003313 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003314 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003315 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003316 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003317 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003318 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003319 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003320 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003321 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003322 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003323 input = "abc"
3324 if encoder is not None:
3325 encoder.reset()
3326 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003327 self.assertEqual(decoder.decode(input), "abc")
3328 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003329
3330 def test_newline_decoder(self):
3331 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003332 # None meaning the IncrementalNewlineDecoder takes unicode input
3333 # rather than bytes input
3334 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003335 'utf-16', 'utf-16-le', 'utf-16-be',
3336 'utf-32', 'utf-32-le', 'utf-32-be',
3337 )
3338 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003339 decoder = enc and codecs.getincrementaldecoder(enc)()
3340 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3341 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003342 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003343 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3344 self.check_newline_decoding_utf8(decoder)
3345
Antoine Pitrou66913e22009-03-06 23:40:56 +00003346 def test_newline_bytes(self):
3347 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3348 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003349 self.assertEqual(dec.newlines, None)
3350 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3351 self.assertEqual(dec.newlines, None)
3352 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3353 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003354 dec = self.IncrementalNewlineDecoder(None, translate=False)
3355 _check(dec)
3356 dec = self.IncrementalNewlineDecoder(None, translate=True)
3357 _check(dec)
3358
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003359class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3360 pass
3361
3362class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3363 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003364
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003365
Guido van Rossum01a27522007-03-07 01:00:12 +00003366# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003367
Guido van Rossum5abbf752007-08-27 17:39:33 +00003368class MiscIOTest(unittest.TestCase):
3369
Barry Warsaw40e82462008-11-20 20:14:50 +00003370 def tearDown(self):
3371 support.unlink(support.TESTFN)
3372
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003373 def test___all__(self):
3374 for name in self.io.__all__:
3375 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003376 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003377 if name == "open":
3378 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003379 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003380 self.assertTrue(issubclass(obj, Exception), name)
3381 elif not name.startswith("SEEK_"):
3382 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003383
Barry Warsaw40e82462008-11-20 20:14:50 +00003384 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003385 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003386 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003387 f.close()
3388
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003389 with support.check_warnings(('', DeprecationWarning)):
3390 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003391 self.assertEqual(f.name, support.TESTFN)
3392 self.assertEqual(f.buffer.name, support.TESTFN)
3393 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3394 self.assertEqual(f.mode, "U")
3395 self.assertEqual(f.buffer.mode, "rb")
3396 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003397 f.close()
3398
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003399 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003400 self.assertEqual(f.mode, "w+")
3401 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3402 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003403
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003404 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003405 self.assertEqual(g.mode, "wb")
3406 self.assertEqual(g.raw.mode, "wb")
3407 self.assertEqual(g.name, f.fileno())
3408 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003409 f.close()
3410 g.close()
3411
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003412 def test_io_after_close(self):
3413 for kwargs in [
3414 {"mode": "w"},
3415 {"mode": "wb"},
3416 {"mode": "w", "buffering": 1},
3417 {"mode": "w", "buffering": 2},
3418 {"mode": "wb", "buffering": 0},
3419 {"mode": "r"},
3420 {"mode": "rb"},
3421 {"mode": "r", "buffering": 1},
3422 {"mode": "r", "buffering": 2},
3423 {"mode": "rb", "buffering": 0},
3424 {"mode": "w+"},
3425 {"mode": "w+b"},
3426 {"mode": "w+", "buffering": 1},
3427 {"mode": "w+", "buffering": 2},
3428 {"mode": "w+b", "buffering": 0},
3429 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003430 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003431 f.close()
3432 self.assertRaises(ValueError, f.flush)
3433 self.assertRaises(ValueError, f.fileno)
3434 self.assertRaises(ValueError, f.isatty)
3435 self.assertRaises(ValueError, f.__iter__)
3436 if hasattr(f, "peek"):
3437 self.assertRaises(ValueError, f.peek, 1)
3438 self.assertRaises(ValueError, f.read)
3439 if hasattr(f, "read1"):
3440 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003441 if hasattr(f, "readall"):
3442 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003443 if hasattr(f, "readinto"):
3444 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003445 if hasattr(f, "readinto1"):
3446 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003447 self.assertRaises(ValueError, f.readline)
3448 self.assertRaises(ValueError, f.readlines)
3449 self.assertRaises(ValueError, f.seek, 0)
3450 self.assertRaises(ValueError, f.tell)
3451 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003452 self.assertRaises(ValueError, f.write,
3453 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003454 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003455 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003456
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003457 def test_blockingioerror(self):
3458 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003459 class C(str):
3460 pass
3461 c = C("")
3462 b = self.BlockingIOError(1, c)
3463 c.b = b
3464 b.c = c
3465 wr = weakref.ref(c)
3466 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003467 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003468 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003469
3470 def test_abcs(self):
3471 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003472 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3473 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3474 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3475 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003476
3477 def _check_abc_inheritance(self, abcmodule):
3478 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003479 self.assertIsInstance(f, abcmodule.IOBase)
3480 self.assertIsInstance(f, abcmodule.RawIOBase)
3481 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3482 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003483 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003484 self.assertIsInstance(f, abcmodule.IOBase)
3485 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3486 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3487 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003488 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003489 self.assertIsInstance(f, abcmodule.IOBase)
3490 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3491 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3492 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003493
3494 def test_abc_inheritance(self):
3495 # Test implementations inherit from their respective ABCs
3496 self._check_abc_inheritance(self)
3497
3498 def test_abc_inheritance_official(self):
3499 # Test implementations inherit from the official ABCs of the
3500 # baseline "io" module.
3501 self._check_abc_inheritance(io)
3502
Antoine Pitroue033e062010-10-29 10:38:18 +00003503 def _check_warn_on_dealloc(self, *args, **kwargs):
3504 f = open(*args, **kwargs)
3505 r = repr(f)
3506 with self.assertWarns(ResourceWarning) as cm:
3507 f = None
3508 support.gc_collect()
3509 self.assertIn(r, str(cm.warning.args[0]))
3510
3511 def test_warn_on_dealloc(self):
3512 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3513 self._check_warn_on_dealloc(support.TESTFN, "wb")
3514 self._check_warn_on_dealloc(support.TESTFN, "w")
3515
3516 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3517 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003518 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003519 for fd in fds:
3520 try:
3521 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003522 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003523 if e.errno != errno.EBADF:
3524 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003525 self.addCleanup(cleanup_fds)
3526 r, w = os.pipe()
3527 fds += r, w
3528 self._check_warn_on_dealloc(r, *args, **kwargs)
3529 # When using closefd=False, there's no warning
3530 r, w = os.pipe()
3531 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02003532 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00003533 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00003534
3535 def test_warn_on_dealloc_fd(self):
3536 self._check_warn_on_dealloc_fd("rb", buffering=0)
3537 self._check_warn_on_dealloc_fd("rb")
3538 self._check_warn_on_dealloc_fd("r")
3539
3540
Antoine Pitrou243757e2010-11-05 21:15:39 +00003541 def test_pickling(self):
3542 # Pickling file objects is forbidden
3543 for kwargs in [
3544 {"mode": "w"},
3545 {"mode": "wb"},
3546 {"mode": "wb", "buffering": 0},
3547 {"mode": "r"},
3548 {"mode": "rb"},
3549 {"mode": "rb", "buffering": 0},
3550 {"mode": "w+"},
3551 {"mode": "w+b"},
3552 {"mode": "w+b", "buffering": 0},
3553 ]:
3554 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3555 with self.open(support.TESTFN, **kwargs) as f:
3556 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3557
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003558 def test_nonblock_pipe_write_bigbuf(self):
3559 self._test_nonblock_pipe_write(16*1024)
3560
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003561 def test_nonblock_pipe_write_smallbuf(self):
3562 self._test_nonblock_pipe_write(1024)
3563
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003564 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3565 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003566 def _test_nonblock_pipe_write(self, bufsize):
3567 sent = []
3568 received = []
3569 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003570 os.set_blocking(r, False)
3571 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003572
3573 # To exercise all code paths in the C implementation we need
3574 # to play with buffer sizes. For instance, if we choose a
3575 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3576 # then we will never get a partial write of the buffer.
3577 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3578 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3579
3580 with rf, wf:
3581 for N in 9999, 73, 7574:
3582 try:
3583 i = 0
3584 while True:
3585 msg = bytes([i % 26 + 97]) * N
3586 sent.append(msg)
3587 wf.write(msg)
3588 i += 1
3589
3590 except self.BlockingIOError as e:
3591 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003592 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003593 sent[-1] = sent[-1][:e.characters_written]
3594 received.append(rf.read())
3595 msg = b'BLOCKED'
3596 wf.write(msg)
3597 sent.append(msg)
3598
3599 while True:
3600 try:
3601 wf.flush()
3602 break
3603 except self.BlockingIOError as e:
3604 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003605 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003606 self.assertEqual(e.characters_written, 0)
3607 received.append(rf.read())
3608
3609 received += iter(rf.read, None)
3610
3611 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003612 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003613 self.assertTrue(wf.closed)
3614 self.assertTrue(rf.closed)
3615
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003616 def test_create_fail(self):
3617 # 'x' mode fails if file is existing
3618 with self.open(support.TESTFN, 'w'):
3619 pass
3620 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3621
3622 def test_create_writes(self):
3623 # 'x' mode opens for writing
3624 with self.open(support.TESTFN, 'xb') as f:
3625 f.write(b"spam")
3626 with self.open(support.TESTFN, 'rb') as f:
3627 self.assertEqual(b"spam", f.read())
3628
Christian Heimes7b648752012-09-10 14:48:43 +02003629 def test_open_allargs(self):
3630 # there used to be a buffer overflow in the parser for rawmode
3631 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3632
3633
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003634class CMiscIOTest(MiscIOTest):
3635 io = io
3636
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003637 def test_readinto_buffer_overflow(self):
3638 # Issue #18025
3639 class BadReader(self.io.BufferedIOBase):
3640 def read(self, n=-1):
3641 return b'x' * 10**6
3642 bufio = BadReader()
3643 b = bytearray(2)
3644 self.assertRaises(ValueError, bufio.readinto, b)
3645
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003646 @unittest.skipUnless(threading, 'Threading required for this test.')
3647 def check_daemon_threads_shutdown_deadlock(self, stream_name):
3648 # Issue #23309: deadlocks at shutdown should be avoided when a
3649 # daemon thread and the main thread both write to a file.
3650 code = """if 1:
3651 import sys
3652 import time
3653 import threading
3654
3655 file = sys.{stream_name}
3656
3657 def run():
3658 while True:
3659 file.write('.')
3660 file.flush()
3661
3662 thread = threading.Thread(target=run)
3663 thread.daemon = True
3664 thread.start()
3665
3666 time.sleep(0.5)
3667 file.write('!')
3668 file.flush()
3669 """.format_map(locals())
3670 res, _ = run_python_until_end("-c", code)
3671 err = res.err.decode()
3672 if res.rc != 0:
3673 # Failure: should be a fatal error
3674 self.assertIn("Fatal Python error: could not acquire lock "
3675 "for <_io.BufferedWriter name='<{stream_name}>'> "
3676 "at interpreter shutdown, possibly due to "
3677 "daemon threads".format_map(locals()),
3678 err)
3679 else:
3680 self.assertFalse(err.strip('.!'))
3681
3682 def test_daemon_threads_shutdown_stdout_deadlock(self):
3683 self.check_daemon_threads_shutdown_deadlock('stdout')
3684
3685 def test_daemon_threads_shutdown_stderr_deadlock(self):
3686 self.check_daemon_threads_shutdown_deadlock('stderr')
3687
3688
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003689class PyMiscIOTest(MiscIOTest):
3690 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003691
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003692
3693@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3694class SignalsTest(unittest.TestCase):
3695
3696 def setUp(self):
3697 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3698
3699 def tearDown(self):
3700 signal.signal(signal.SIGALRM, self.oldalrm)
3701
3702 def alarm_interrupt(self, sig, frame):
3703 1/0
3704
3705 @unittest.skipUnless(threading, 'Threading required for this test.')
3706 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3707 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003708 invokes the signal handler, and bubbles up the exception raised
3709 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003710 read_results = []
3711 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003712 if hasattr(signal, 'pthread_sigmask'):
3713 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003714 s = os.read(r, 1)
3715 read_results.append(s)
3716 t = threading.Thread(target=_read)
3717 t.daemon = True
3718 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003719 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01003720 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003721 try:
3722 wio = self.io.open(w, **fdopen_kwargs)
3723 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003724 # Fill the pipe enough that the write will be blocking.
3725 # It will be interrupted by the timer armed above. Since the
3726 # other thread has read one byte, the low-level write will
3727 # return with a successful (partial) result rather than an EINTR.
3728 # The buffered IO layer must check for pending signal
3729 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003730 signal.alarm(1)
3731 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01003732 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02003733 finally:
3734 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003735 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003736 # We got one byte, get another one and check that it isn't a
3737 # repeat of the first one.
3738 read_results.append(os.read(r, 1))
3739 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3740 finally:
3741 os.close(w)
3742 os.close(r)
3743 # This is deliberate. If we didn't close the file descriptor
3744 # before closing wio, wio would try to flush its internal
3745 # buffer, and block again.
3746 try:
3747 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003748 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003749 if e.errno != errno.EBADF:
3750 raise
3751
3752 def test_interrupted_write_unbuffered(self):
3753 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3754
3755 def test_interrupted_write_buffered(self):
3756 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3757
Victor Stinner6ab72862014-09-03 23:32:28 +02003758 # Issue #22331: The test hangs on FreeBSD 7.2
3759 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003760 def test_interrupted_write_text(self):
3761 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3762
Brett Cannon31f59292011-02-21 19:29:56 +00003763 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003764 def check_reentrant_write(self, data, **fdopen_kwargs):
3765 def on_alarm(*args):
3766 # Will be called reentrantly from the same thread
3767 wio.write(data)
3768 1/0
3769 signal.signal(signal.SIGALRM, on_alarm)
3770 r, w = os.pipe()
3771 wio = self.io.open(w, **fdopen_kwargs)
3772 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003773 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003774 # Either the reentrant call to wio.write() fails with RuntimeError,
3775 # or the signal handler raises ZeroDivisionError.
3776 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3777 while 1:
3778 for i in range(100):
3779 wio.write(data)
3780 wio.flush()
3781 # Make sure the buffer doesn't fill up and block further writes
3782 os.read(r, len(data) * 100)
3783 exc = cm.exception
3784 if isinstance(exc, RuntimeError):
3785 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3786 finally:
3787 wio.close()
3788 os.close(r)
3789
3790 def test_reentrant_write_buffered(self):
3791 self.check_reentrant_write(b"xy", mode="wb")
3792
3793 def test_reentrant_write_text(self):
3794 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3795
Antoine Pitrou707ce822011-02-25 21:24:11 +00003796 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3797 """Check that a buffered read, when it gets interrupted (either
3798 returning a partial result or EINTR), properly invokes the signal
3799 handler and retries if the latter returned successfully."""
3800 r, w = os.pipe()
3801 fdopen_kwargs["closefd"] = False
3802 def alarm_handler(sig, frame):
3803 os.write(w, b"bar")
3804 signal.signal(signal.SIGALRM, alarm_handler)
3805 try:
3806 rio = self.io.open(r, **fdopen_kwargs)
3807 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003808 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003809 # Expected behaviour:
3810 # - first raw read() returns partial b"foo"
3811 # - second raw read() returns EINTR
3812 # - third raw read() returns b"bar"
3813 self.assertEqual(decode(rio.read(6)), "foobar")
3814 finally:
3815 rio.close()
3816 os.close(w)
3817 os.close(r)
3818
Antoine Pitrou20db5112011-08-19 20:32:34 +02003819 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003820 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3821 mode="rb")
3822
Antoine Pitrou20db5112011-08-19 20:32:34 +02003823 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003824 self.check_interrupted_read_retry(lambda x: x,
3825 mode="r")
3826
3827 @unittest.skipUnless(threading, 'Threading required for this test.')
3828 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3829 """Check that a buffered write, when it gets interrupted (either
3830 returning a partial result or EINTR), properly invokes the signal
3831 handler and retries if the latter returned successfully."""
3832 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003833
Antoine Pitrou707ce822011-02-25 21:24:11 +00003834 # A quantity that exceeds the buffer size of an anonymous pipe's
3835 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003836 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003837 r, w = os.pipe()
3838 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003839
Antoine Pitrou707ce822011-02-25 21:24:11 +00003840 # We need a separate thread to read from the pipe and allow the
3841 # write() to finish. This thread is started after the SIGALRM is
3842 # received (forcing a first EINTR in write()).
3843 read_results = []
3844 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003845 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00003846 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003847 try:
3848 while not write_finished:
3849 while r in select.select([r], [], [], 1.0)[0]:
3850 s = os.read(r, 1024)
3851 read_results.append(s)
3852 except BaseException as exc:
3853 nonlocal error
3854 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00003855 t = threading.Thread(target=_read)
3856 t.daemon = True
3857 def alarm1(sig, frame):
3858 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003859 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003860 def alarm2(sig, frame):
3861 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003862
3863 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00003864 signal.signal(signal.SIGALRM, alarm1)
3865 try:
3866 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003867 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003868 # Expected behaviour:
3869 # - first raw write() is partial (because of the limited pipe buffer
3870 # and the first alarm)
3871 # - second raw write() returns EINTR (because of the second alarm)
3872 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003873 written = wio.write(large_data)
3874 self.assertEqual(N, written)
3875
Antoine Pitrou707ce822011-02-25 21:24:11 +00003876 wio.flush()
3877 write_finished = True
3878 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003879
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003880 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003881 self.assertEqual(N, sum(len(x) for x in read_results))
3882 finally:
3883 write_finished = True
3884 os.close(w)
3885 os.close(r)
3886 # This is deliberate. If we didn't close the file descriptor
3887 # before closing wio, wio would try to flush its internal
3888 # buffer, and could block (in case of failure).
3889 try:
3890 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003891 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003892 if e.errno != errno.EBADF:
3893 raise
3894
Antoine Pitrou20db5112011-08-19 20:32:34 +02003895 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003896 self.check_interrupted_write_retry(b"x", mode="wb")
3897
Antoine Pitrou20db5112011-08-19 20:32:34 +02003898 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003899 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3900
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003901
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003902class CSignalsTest(SignalsTest):
3903 io = io
3904
3905class PySignalsTest(SignalsTest):
3906 io = pyio
3907
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003908 # Handling reentrancy issues would slow down _pyio even more, so the
3909 # tests are disabled.
3910 test_reentrant_write_buffered = None
3911 test_reentrant_write_text = None
3912
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003913
Ezio Melottidaa42c72013-03-23 16:30:16 +02003914def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07003915 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003916 CBufferedReaderTest, PyBufferedReaderTest,
3917 CBufferedWriterTest, PyBufferedWriterTest,
3918 CBufferedRWPairTest, PyBufferedRWPairTest,
3919 CBufferedRandomTest, PyBufferedRandomTest,
3920 StatefulIncrementalDecoderTest,
3921 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3922 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003923 CMiscIOTest, PyMiscIOTest,
3924 CSignalsTest, PySignalsTest,
3925 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003926
3927 # Put the namespaces of the IO module we are testing and some useful mock
3928 # classes in the __dict__ of each test.
3929 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003930 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003931 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3932 c_io_ns = {name : getattr(io, name) for name in all_members}
3933 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3934 globs = globals()
3935 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3936 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3937 # Avoid turning open into a bound method.
3938 py_io_ns["open"] = pyio.OpenWrapper
3939 for test in tests:
3940 if test.__name__.startswith("C"):
3941 for name, obj in c_io_ns.items():
3942 setattr(test, name, obj)
3943 elif test.__name__.startswith("Py"):
3944 for name, obj in py_io_ns.items():
3945 setattr(test, name, obj)
3946
Ezio Melottidaa42c72013-03-23 16:30:16 +02003947 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3948 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003949
3950if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003951 unittest.main()