blob: ce4ed1b8f6a48ff7fbad9409ae3ab6c758a60687 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020031import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000032import time
Guido van Rossum28524c72007-02-27 05:47:44 +000033import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000034import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020035import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020036from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000038from test import support
Berker Peksagce643912015-05-06 06:33:17 +030039from test.support.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000040
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000041import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000042import io # C implementation of io
43import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000044
Martin Panter6bb91f32016-05-28 00:41:57 +000045try:
46 import ctypes
47except ImportError:
48 def byteslike(*pos, **kw):
49 return array.array("b", bytes(*pos, **kw))
50else:
51 def byteslike(*pos, **kw):
52 """Create a bytes-like object having no string or sequence methods"""
53 data = bytes(*pos, **kw)
54 obj = EmptyStruct()
55 ctypes.resize(obj, len(data))
56 memoryview(obj).cast("B")[:] = data
57 return obj
58 class EmptyStruct(ctypes.Structure):
59 pass
60
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000061def _default_chunk_size():
62 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000063 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 return f._CHUNK_SIZE
65
66
Antoine Pitrou328ec742010-09-14 18:37:24 +000067class MockRawIOWithoutRead:
68 """A RawIO implementation without read(), so as to exercise the default
69 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000070
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000071 def __init__(self, read_stack=()):
72 self._read_stack = list(read_stack)
73 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000074 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000075 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000076
Guido van Rossum01a27522007-03-07 01:00:12 +000077 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000078 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000079 return len(b)
80
81 def writable(self):
82 return True
83
Guido van Rossum68bbcd22007-02-27 17:19:33 +000084 def fileno(self):
85 return 42
86
87 def readable(self):
88 return True
89
Guido van Rossum01a27522007-03-07 01:00:12 +000090 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000091 return True
92
Guido van Rossum01a27522007-03-07 01:00:12 +000093 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000094 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000095
96 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0 # same comment as above
98
99 def readinto(self, buf):
100 self._reads += 1
101 max_len = len(buf)
102 try:
103 data = self._read_stack[0]
104 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000105 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000106 return 0
107 if data is None:
108 del self._read_stack[0]
109 return None
110 n = len(data)
111 if len(data) <= max_len:
112 del self._read_stack[0]
113 buf[:n] = data
114 return n
115 else:
116 buf[:] = data[:max_len]
117 self._read_stack[0] = data[max_len:]
118 return max_len
119
120 def truncate(self, pos=None):
121 return pos
122
Antoine Pitrou328ec742010-09-14 18:37:24 +0000123class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
124 pass
125
126class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
127 pass
128
129
130class MockRawIO(MockRawIOWithoutRead):
131
132 def read(self, n=None):
133 self._reads += 1
134 try:
135 return self._read_stack.pop(0)
136 except:
137 self._extraneous_reads += 1
138 return b""
139
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000140class CMockRawIO(MockRawIO, io.RawIOBase):
141 pass
142
143class PyMockRawIO(MockRawIO, pyio.RawIOBase):
144 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000145
Guido van Rossuma9e20242007-03-08 00:43:48 +0000146
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000147class MisbehavedRawIO(MockRawIO):
148 def write(self, b):
149 return super().write(b) * 2
150
151 def read(self, n=None):
152 return super().read(n) * 2
153
154 def seek(self, pos, whence):
155 return -123
156
157 def tell(self):
158 return -456
159
160 def readinto(self, buf):
161 super().readinto(buf)
162 return len(buf) * 5
163
164class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
165 pass
166
167class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
168 pass
169
170
171class CloseFailureIO(MockRawIO):
172 closed = 0
173
174 def close(self):
175 if not self.closed:
176 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200177 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000178
179class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
180 pass
181
182class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
183 pass
184
185
186class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000187
188 def __init__(self, data):
189 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000190 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000191
192 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000194 self.read_history.append(None if res is None else len(res))
195 return res
196
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000197 def readinto(self, b):
198 res = super().readinto(b)
199 self.read_history.append(res)
200 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000201
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000202class CMockFileIO(MockFileIO, io.BytesIO):
203 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000204
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000205class PyMockFileIO(MockFileIO, pyio.BytesIO):
206 pass
207
208
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000209class MockUnseekableIO:
210 def seekable(self):
211 return False
212
213 def seek(self, *args):
214 raise self.UnsupportedOperation("not seekable")
215
216 def tell(self, *args):
217 raise self.UnsupportedOperation("not seekable")
218
Martin Panter754aab22016-03-31 07:21:56 +0000219 def truncate(self, *args):
220 raise self.UnsupportedOperation("not seekable")
221
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000222class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
223 UnsupportedOperation = io.UnsupportedOperation
224
225class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
226 UnsupportedOperation = pyio.UnsupportedOperation
227
228
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000229class MockNonBlockWriterIO:
230
231 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000232 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000233 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000234
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000235 def pop_written(self):
236 s = b"".join(self._write_stack)
237 self._write_stack[:] = []
238 return s
239
240 def block_on(self, char):
241 """Block when a given char is encountered."""
242 self._blocker_char = char
243
244 def readable(self):
245 return True
246
247 def seekable(self):
248 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000249
Guido van Rossum01a27522007-03-07 01:00:12 +0000250 def writable(self):
251 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000252
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000253 def write(self, b):
254 b = bytes(b)
255 n = -1
256 if self._blocker_char:
257 try:
258 n = b.index(self._blocker_char)
259 except ValueError:
260 pass
261 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100262 if n > 0:
263 # write data up to the first blocker
264 self._write_stack.append(b[:n])
265 return n
266 else:
267 # cancel blocker and indicate would block
268 self._blocker_char = None
269 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000270 self._write_stack.append(b)
271 return len(b)
272
273class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
274 BlockingIOError = io.BlockingIOError
275
276class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
277 BlockingIOError = pyio.BlockingIOError
278
Guido van Rossuma9e20242007-03-08 00:43:48 +0000279
Guido van Rossum28524c72007-02-27 05:47:44 +0000280class IOTest(unittest.TestCase):
281
Neal Norwitze7789b12008-03-24 06:18:09 +0000282 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000283 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000284
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000285 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000286 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000287
Guido van Rossum28524c72007-02-27 05:47:44 +0000288 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000290 f.truncate(0)
291 self.assertEqual(f.tell(), 5)
292 f.seek(0)
293
294 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000295 self.assertEqual(f.seek(0), 0)
296 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000297 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000298 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000299 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000300 buffer = bytearray(b" world\n\n\n")
301 self.assertEqual(f.write(buffer), 9)
302 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000303 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000304 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000305 self.assertEqual(f.seek(-1, 2), 13)
306 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000307
Guido van Rossum87429772007-04-10 21:06:59 +0000308 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000309 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000310 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000311
Guido van Rossum9b76da62007-04-11 01:09:03 +0000312 def read_ops(self, f, buffered=False):
313 data = f.read(5)
314 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000315 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000316 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000317 self.assertEqual(bytes(data), b" worl")
318 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 self.assertEqual(f.readinto(data), 2)
320 self.assertEqual(len(data), 5)
321 self.assertEqual(data[:2], b"d\n")
322 self.assertEqual(f.seek(0), 0)
323 self.assertEqual(f.read(20), b"hello world\n")
324 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000325 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000326 self.assertEqual(f.seek(-6, 2), 6)
327 self.assertEqual(f.read(5), b"world")
328 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000329 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000330 self.assertEqual(f.seek(-6, 1), 5)
331 self.assertEqual(f.read(5), b" worl")
332 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000333 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000334 if buffered:
335 f.seek(0)
336 self.assertEqual(f.read(), b"hello world\n")
337 f.seek(6)
338 self.assertEqual(f.read(), b"world\n")
339 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000340 f.seek(0)
341 data = byteslike(5)
342 self.assertEqual(f.readinto1(data), 5)
343 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000344
Guido van Rossum34d69e52007-04-10 20:08:41 +0000345 LARGE = 2**31
346
Guido van Rossum53807da2007-04-10 19:01:47 +0000347 def large_file_ops(self, f):
348 assert f.readable()
349 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100350 try:
351 self.assertEqual(f.seek(self.LARGE), self.LARGE)
352 except (OverflowError, ValueError):
353 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000354 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000355 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000356 self.assertEqual(f.tell(), self.LARGE + 3)
357 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000358 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000359 self.assertEqual(f.tell(), self.LARGE + 2)
360 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000361 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000362 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000363 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
364 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000365 self.assertEqual(f.read(2), b"x")
366
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000367 def test_invalid_operations(self):
368 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000369 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000370 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000371 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000372 self.assertRaises(exc, fp.read)
373 self.assertRaises(exc, fp.readline)
374 with self.open(support.TESTFN, "wb", buffering=0) as fp:
375 self.assertRaises(exc, fp.read)
376 self.assertRaises(exc, fp.readline)
377 with self.open(support.TESTFN, "rb", buffering=0) as fp:
378 self.assertRaises(exc, fp.write, b"blah")
379 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000380 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000381 self.assertRaises(exc, fp.write, b"blah")
382 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000383 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000384 self.assertRaises(exc, fp.write, "blah")
385 self.assertRaises(exc, fp.writelines, ["blah\n"])
386 # Non-zero seeking from current or end pos
387 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
388 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000389
Martin Panter754aab22016-03-31 07:21:56 +0000390 def test_optional_abilities(self):
391 # Test for OSError when optional APIs are not supported
392 # The purpose of this test is to try fileno(), reading, writing and
393 # seeking operations with various objects that indicate they do not
394 # support these operations.
395
396 def pipe_reader():
397 [r, w] = os.pipe()
398 os.close(w) # So that read() is harmless
399 return self.FileIO(r, "r")
400
401 def pipe_writer():
402 [r, w] = os.pipe()
403 self.addCleanup(os.close, r)
404 # Guarantee that we can write into the pipe without blocking
405 thread = threading.Thread(target=os.read, args=(r, 100))
406 thread.start()
407 self.addCleanup(thread.join)
408 return self.FileIO(w, "w")
409
410 def buffered_reader():
411 return self.BufferedReader(self.MockUnseekableIO())
412
413 def buffered_writer():
414 return self.BufferedWriter(self.MockUnseekableIO())
415
416 def buffered_random():
417 return self.BufferedRandom(self.BytesIO())
418
419 def buffered_rw_pair():
420 return self.BufferedRWPair(self.MockUnseekableIO(),
421 self.MockUnseekableIO())
422
423 def text_reader():
424 class UnseekableReader(self.MockUnseekableIO):
425 writable = self.BufferedIOBase.writable
426 write = self.BufferedIOBase.write
427 return self.TextIOWrapper(UnseekableReader(), "ascii")
428
429 def text_writer():
430 class UnseekableWriter(self.MockUnseekableIO):
431 readable = self.BufferedIOBase.readable
432 read = self.BufferedIOBase.read
433 return self.TextIOWrapper(UnseekableWriter(), "ascii")
434
435 tests = (
436 (pipe_reader, "fr"), (pipe_writer, "fw"),
437 (buffered_reader, "r"), (buffered_writer, "w"),
438 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
439 (text_reader, "r"), (text_writer, "w"),
440 (self.BytesIO, "rws"), (self.StringIO, "rws"),
441 )
442 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000443 with self.subTest(test), test() as obj:
444 readable = "r" in abilities
445 self.assertEqual(obj.readable(), readable)
446 writable = "w" in abilities
447 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000448
449 if isinstance(obj, self.TextIOBase):
450 data = "3"
451 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
452 data = b"3"
453 else:
454 self.fail("Unknown base class")
455
456 if "f" in abilities:
457 obj.fileno()
458 else:
459 self.assertRaises(OSError, obj.fileno)
460
461 if readable:
462 obj.read(1)
463 obj.read()
464 else:
465 self.assertRaises(OSError, obj.read, 1)
466 self.assertRaises(OSError, obj.read)
467
468 if writable:
469 obj.write(data)
470 else:
471 self.assertRaises(OSError, obj.write, data)
472
Martin Panter3ee147f2016-03-31 21:05:31 +0000473 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000474 pipe_reader, pipe_writer):
475 # Pipes seem to appear as seekable on Windows
476 continue
477 seekable = "s" in abilities
478 self.assertEqual(obj.seekable(), seekable)
479
Martin Panter754aab22016-03-31 07:21:56 +0000480 if seekable:
481 obj.tell()
482 obj.seek(0)
483 else:
484 self.assertRaises(OSError, obj.tell)
485 self.assertRaises(OSError, obj.seek, 0)
486
487 if writable and seekable:
488 obj.truncate()
489 obj.truncate(0)
490 else:
491 self.assertRaises(OSError, obj.truncate)
492 self.assertRaises(OSError, obj.truncate, 0)
493
Antoine Pitrou13348842012-01-29 18:36:34 +0100494 def test_open_handles_NUL_chars(self):
495 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300496 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100497
498 bytes_fn = bytes(fn_with_NUL, 'ascii')
499 with warnings.catch_warnings():
500 warnings.simplefilter("ignore", DeprecationWarning)
501 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100502
Guido van Rossum28524c72007-02-27 05:47:44 +0000503 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000504 with self.open(support.TESTFN, "wb", buffering=0) as f:
505 self.assertEqual(f.readable(), False)
506 self.assertEqual(f.writable(), True)
507 self.assertEqual(f.seekable(), True)
508 self.write_ops(f)
509 with self.open(support.TESTFN, "rb", buffering=0) as f:
510 self.assertEqual(f.readable(), True)
511 self.assertEqual(f.writable(), False)
512 self.assertEqual(f.seekable(), True)
513 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000514
Guido van Rossum87429772007-04-10 21:06:59 +0000515 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000516 with self.open(support.TESTFN, "wb") as f:
517 self.assertEqual(f.readable(), False)
518 self.assertEqual(f.writable(), True)
519 self.assertEqual(f.seekable(), True)
520 self.write_ops(f)
521 with self.open(support.TESTFN, "rb") as f:
522 self.assertEqual(f.readable(), True)
523 self.assertEqual(f.writable(), False)
524 self.assertEqual(f.seekable(), True)
525 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000526
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000527 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000528 with self.open(support.TESTFN, "wb") as f:
529 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
530 with self.open(support.TESTFN, "rb") as f:
531 self.assertEqual(f.readline(), b"abc\n")
532 self.assertEqual(f.readline(10), b"def\n")
533 self.assertEqual(f.readline(2), b"xy")
534 self.assertEqual(f.readline(4), b"zzy\n")
535 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000536 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000537 self.assertRaises(TypeError, f.readline, 5.3)
538 with self.open(support.TESTFN, "r") as f:
539 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000540
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300541 def test_readline_nonsizeable(self):
542 # Issue #30061
543 # Crash when readline() returns an object without __len__
544 class R(self.IOBase):
545 def readline(self):
546 return None
547 self.assertRaises((TypeError, StopIteration), next, R())
548
549 def test_next_nonsizeable(self):
550 # Issue #30061
551 # Crash when __next__() returns an object without __len__
552 class R(self.IOBase):
553 def __next__(self):
554 return None
555 self.assertRaises(TypeError, R().readlines, 1)
556
Guido van Rossum28524c72007-02-27 05:47:44 +0000557 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000558 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000559 self.write_ops(f)
560 data = f.getvalue()
561 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000563 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000564
Guido van Rossum53807da2007-04-10 19:01:47 +0000565 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000566 # On Windows and Mac OSX this test comsumes large resources; It takes
567 # a long time to build the >2GB file and takes >2GB of disk space
568 # therefore the resource must be enabled to run this test.
569 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600570 support.requires(
571 'largefile',
572 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000573 with self.open(support.TESTFN, "w+b", 0) as f:
574 self.large_file_ops(f)
575 with self.open(support.TESTFN, "w+b") as f:
576 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000577
578 def test_with_open(self):
579 for bufsize in (0, 1, 100):
580 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000581 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000582 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000583 self.assertEqual(f.closed, True)
584 f = None
585 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000586 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000587 1/0
588 except ZeroDivisionError:
589 self.assertEqual(f.closed, True)
590 else:
591 self.fail("1/0 didn't raise an exception")
592
Antoine Pitrou08838b62009-01-21 00:55:13 +0000593 # issue 5008
594 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000595 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000596 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000597 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000598 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000599 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000600 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000601 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300602 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000603
Guido van Rossum87429772007-04-10 21:06:59 +0000604 def test_destructor(self):
605 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000606 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000607 def __del__(self):
608 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000609 try:
610 f = super().__del__
611 except AttributeError:
612 pass
613 else:
614 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000615 def close(self):
616 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000617 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000618 def flush(self):
619 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000620 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000621 with support.check_warnings(('', ResourceWarning)):
622 f = MyFileIO(support.TESTFN, "wb")
623 f.write(b"xxx")
624 del f
625 support.gc_collect()
626 self.assertEqual(record, [1, 2, 3])
627 with self.open(support.TESTFN, "rb") as f:
628 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000629
630 def _check_base_destructor(self, base):
631 record = []
632 class MyIO(base):
633 def __init__(self):
634 # This exercises the availability of attributes on object
635 # destruction.
636 # (in the C version, close() is called by the tp_dealloc
637 # function, not by __del__)
638 self.on_del = 1
639 self.on_close = 2
640 self.on_flush = 3
641 def __del__(self):
642 record.append(self.on_del)
643 try:
644 f = super().__del__
645 except AttributeError:
646 pass
647 else:
648 f()
649 def close(self):
650 record.append(self.on_close)
651 super().close()
652 def flush(self):
653 record.append(self.on_flush)
654 super().flush()
655 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000656 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000657 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000658 self.assertEqual(record, [1, 2, 3])
659
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000660 def test_IOBase_destructor(self):
661 self._check_base_destructor(self.IOBase)
662
663 def test_RawIOBase_destructor(self):
664 self._check_base_destructor(self.RawIOBase)
665
666 def test_BufferedIOBase_destructor(self):
667 self._check_base_destructor(self.BufferedIOBase)
668
669 def test_TextIOBase_destructor(self):
670 self._check_base_destructor(self.TextIOBase)
671
Guido van Rossum87429772007-04-10 21:06:59 +0000672 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000673 with self.open(support.TESTFN, "wb") as f:
674 f.write(b"xxx")
675 with self.open(support.TESTFN, "rb") as f:
676 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000677
Guido van Rossumd4103952007-04-12 05:44:49 +0000678 def test_array_writes(self):
679 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000680 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000681 def check(f):
682 with f:
683 self.assertEqual(f.write(a), n)
684 f.writelines((a,))
685 check(self.BytesIO())
686 check(self.FileIO(support.TESTFN, "w"))
687 check(self.BufferedWriter(self.MockRawIO()))
688 check(self.BufferedRandom(self.MockRawIO()))
689 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000690
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000691 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000692 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000693 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000694
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000695 def test_read_closed(self):
696 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000697 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000698 with self.open(support.TESTFN, "r") as f:
699 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000700 self.assertEqual(file.read(), "egg\n")
701 file.seek(0)
702 file.close()
703 self.assertRaises(ValueError, file.read)
704
705 def test_no_closefd_with_filename(self):
706 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000707 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000708
709 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000710 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000711 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000712 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000713 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000714 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000715 self.assertEqual(file.buffer.raw.closefd, False)
716
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000717 def test_garbage_collection(self):
718 # FileIO objects are collected, and collecting them flushes
719 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000720 with support.check_warnings(('', ResourceWarning)):
721 f = self.FileIO(support.TESTFN, "wb")
722 f.write(b"abcxxx")
723 f.f = f
724 wr = weakref.ref(f)
725 del f
726 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300727 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000728 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000729 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000730
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000731 def test_unbounded_file(self):
732 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
733 zero = "/dev/zero"
734 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000735 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000736 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000737 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000738 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000739 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000740 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000741 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000742 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000743 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000744 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000745 self.assertRaises(OverflowError, f.read)
746
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200747 def check_flush_error_on_close(self, *args, **kwargs):
748 # Test that the file is closed despite failed flush
749 # and that flush() is called before file closed.
750 f = self.open(*args, **kwargs)
751 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000752 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200753 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200754 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000755 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200756 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600757 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200758 self.assertTrue(closed) # flush() called
759 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200760 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200761
762 def test_flush_error_on_close(self):
763 # raw file
764 # Issue #5700: io.FileIO calls flush() after file closed
765 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
766 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
767 self.check_flush_error_on_close(fd, 'wb', buffering=0)
768 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
769 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
770 os.close(fd)
771 # buffered io
772 self.check_flush_error_on_close(support.TESTFN, 'wb')
773 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
774 self.check_flush_error_on_close(fd, 'wb')
775 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
776 self.check_flush_error_on_close(fd, 'wb', closefd=False)
777 os.close(fd)
778 # text io
779 self.check_flush_error_on_close(support.TESTFN, 'w')
780 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
781 self.check_flush_error_on_close(fd, 'w')
782 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
783 self.check_flush_error_on_close(fd, 'w', closefd=False)
784 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000785
786 def test_multi_close(self):
787 f = self.open(support.TESTFN, "wb", buffering=0)
788 f.close()
789 f.close()
790 f.close()
791 self.assertRaises(ValueError, f.flush)
792
Antoine Pitrou328ec742010-09-14 18:37:24 +0000793 def test_RawIOBase_read(self):
794 # Exercise the default RawIOBase.read() implementation (which calls
795 # readinto() internally).
796 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
797 self.assertEqual(rawio.read(2), b"ab")
798 self.assertEqual(rawio.read(2), b"c")
799 self.assertEqual(rawio.read(2), b"d")
800 self.assertEqual(rawio.read(2), None)
801 self.assertEqual(rawio.read(2), b"ef")
802 self.assertEqual(rawio.read(2), b"g")
803 self.assertEqual(rawio.read(2), None)
804 self.assertEqual(rawio.read(2), b"")
805
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400806 def test_types_have_dict(self):
807 test = (
808 self.IOBase(),
809 self.RawIOBase(),
810 self.TextIOBase(),
811 self.StringIO(),
812 self.BytesIO()
813 )
814 for obj in test:
815 self.assertTrue(hasattr(obj, "__dict__"))
816
Ross Lagerwall59142db2011-10-31 20:34:46 +0200817 def test_opener(self):
818 with self.open(support.TESTFN, "w") as f:
819 f.write("egg\n")
820 fd = os.open(support.TESTFN, os.O_RDONLY)
821 def opener(path, flags):
822 return fd
823 with self.open("non-existent", "r", opener=opener) as f:
824 self.assertEqual(f.read(), "egg\n")
825
Barry Warsaw480e2852016-06-08 17:47:26 -0400826 def test_bad_opener_negative_1(self):
827 # Issue #27066.
828 def badopener(fname, flags):
829 return -1
830 with self.assertRaises(ValueError) as cm:
831 open('non-existent', 'r', opener=badopener)
832 self.assertEqual(str(cm.exception), 'opener returned -1')
833
834 def test_bad_opener_other_negative(self):
835 # Issue #27066.
836 def badopener(fname, flags):
837 return -2
838 with self.assertRaises(ValueError) as cm:
839 open('non-existent', 'r', opener=badopener)
840 self.assertEqual(str(cm.exception), 'opener returned -2')
841
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200842 def test_fileio_closefd(self):
843 # Issue #4841
844 with self.open(__file__, 'rb') as f1, \
845 self.open(__file__, 'rb') as f2:
846 fileio = self.FileIO(f1.fileno(), closefd=False)
847 # .__init__() must not close f1
848 fileio.__init__(f2.fileno(), closefd=False)
849 f1.readline()
850 # .close() must not close f2
851 fileio.close()
852 f2.readline()
853
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300854 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200855 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300856 with self.assertRaises(ValueError):
857 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300858
859 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200860 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300861 with self.assertRaises(ValueError):
862 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300863
Martin Panter6bb91f32016-05-28 00:41:57 +0000864 def test_buffered_readinto_mixin(self):
865 # Test the implementation provided by BufferedIOBase
866 class Stream(self.BufferedIOBase):
867 def read(self, size):
868 return b"12345"
869 read1 = read
870 stream = Stream()
871 for method in ("readinto", "readinto1"):
872 with self.subTest(method):
873 buffer = byteslike(5)
874 self.assertEqual(getattr(stream, method)(buffer), 5)
875 self.assertEqual(bytes(buffer), b"12345")
876
Ethan Furmand62548a2016-06-04 14:38:43 -0700877 def test_fspath_support(self):
878 class PathLike:
879 def __init__(self, path):
880 self.path = path
881
882 def __fspath__(self):
883 return self.path
884
885 def check_path_succeeds(path):
886 with self.open(path, "w") as f:
887 f.write("egg\n")
888
889 with self.open(path, "r") as f:
890 self.assertEqual(f.read(), "egg\n")
891
892 check_path_succeeds(PathLike(support.TESTFN))
893 check_path_succeeds(PathLike(support.TESTFN.encode('utf-8')))
894
895 bad_path = PathLike(TypeError)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700896 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700897 self.open(bad_path, 'w')
898
899 # ensure that refcounting is correct with some error conditions
900 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
901 self.open(PathLike(support.TESTFN), 'rwxa')
902
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200903
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000904class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200905
906 def test_IOBase_finalize(self):
907 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
908 # class which inherits IOBase and an object of this class are caught
909 # in a reference cycle and close() is already in the method cache.
910 class MyIO(self.IOBase):
911 def close(self):
912 pass
913
914 # create an instance to populate the method cache
915 MyIO()
916 obj = MyIO()
917 obj.obj = obj
918 wr = weakref.ref(obj)
919 del MyIO
920 del obj
921 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300922 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000923
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000924class PyIOTest(IOTest):
925 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000926
Guido van Rossuma9e20242007-03-08 00:43:48 +0000927
Gregory P. Smith1bef9072015-04-14 13:24:34 -0700928@support.cpython_only
929class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -0700930
Gregory P. Smith054b0652015-04-14 12:58:05 -0700931 def test_RawIOBase_io_in_pyio_match(self):
932 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +0200933 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
934 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -0700935 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
936
937 def test_RawIOBase_pyio_in_io_match(self):
938 """Test that c RawIOBase class has all pyio RawIOBase methods"""
939 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
940 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
941
942
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000943class CommonBufferedTests:
944 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
945
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000946 def test_detach(self):
947 raw = self.MockRawIO()
948 buf = self.tp(raw)
949 self.assertIs(buf.detach(), raw)
950 self.assertRaises(ValueError, buf.detach)
951
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600952 repr(buf) # Should still work
953
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000954 def test_fileno(self):
955 rawio = self.MockRawIO()
956 bufio = self.tp(rawio)
957
Ezio Melottib3aedd42010-11-20 19:04:17 +0000958 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000959
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000960 def test_invalid_args(self):
961 rawio = self.MockRawIO()
962 bufio = self.tp(rawio)
963 # Invalid whence
964 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200965 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000966
967 def test_override_destructor(self):
968 tp = self.tp
969 record = []
970 class MyBufferedIO(tp):
971 def __del__(self):
972 record.append(1)
973 try:
974 f = super().__del__
975 except AttributeError:
976 pass
977 else:
978 f()
979 def close(self):
980 record.append(2)
981 super().close()
982 def flush(self):
983 record.append(3)
984 super().flush()
985 rawio = self.MockRawIO()
986 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000987 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000988 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +0000989 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000990
991 def test_context_manager(self):
992 # Test usability as a context manager
993 rawio = self.MockRawIO()
994 bufio = self.tp(rawio)
995 def _with():
996 with bufio:
997 pass
998 _with()
999 # bufio should now be closed, and using it a second time should raise
1000 # a ValueError.
1001 self.assertRaises(ValueError, _with)
1002
1003 def test_error_through_destructor(self):
1004 # Test that the exception state is not modified by a destructor,
1005 # even if close() fails.
1006 rawio = self.CloseFailureIO()
1007 def f():
1008 self.tp(rawio).xyzzy
1009 with support.captured_output("stderr") as s:
1010 self.assertRaises(AttributeError, f)
1011 s = s.getvalue().strip()
1012 if s:
1013 # The destructor *may* have printed an unraisable error, check it
1014 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001015 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001016 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +00001017
Antoine Pitrou716c4442009-05-23 19:04:03 +00001018 def test_repr(self):
1019 raw = self.MockRawIO()
1020 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +03001021 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001022 self.assertEqual(repr(b), "<%s>" % clsname)
1023 raw.name = "dummy"
1024 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
1025 raw.name = b"dummy"
1026 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
1027
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001028 def test_recursive_repr(self):
1029 # Issue #25455
1030 raw = self.MockRawIO()
1031 b = self.tp(raw)
1032 with support.swap_attr(raw, 'name', b):
1033 try:
1034 repr(b) # Should not crash
1035 except RuntimeError:
1036 pass
1037
Antoine Pitrou6be88762010-05-03 16:48:20 +00001038 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001039 # Test that buffered file is closed despite failed flush
1040 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001041 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001042 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001043 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001044 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001045 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001046 raw.flush = bad_flush
1047 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001048 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001049 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001050 self.assertTrue(raw.closed)
1051 self.assertTrue(closed) # flush() called
1052 self.assertFalse(closed[0]) # flush() called before file closed
1053 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001054 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001055
1056 def test_close_error_on_close(self):
1057 raw = self.MockRawIO()
1058 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001059 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001060 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001061 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001062 raw.close = bad_close
1063 b = self.tp(raw)
1064 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001065 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001066 b.close()
1067 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001068 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001069 self.assertEqual(err.exception.__context__.args, ('flush',))
1070 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001071
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001072 def test_nonnormalized_close_error_on_close(self):
1073 # Issue #21677
1074 raw = self.MockRawIO()
1075 def bad_flush():
1076 raise non_existing_flush
1077 def bad_close():
1078 raise non_existing_close
1079 raw.close = bad_close
1080 b = self.tp(raw)
1081 b.flush = bad_flush
1082 with self.assertRaises(NameError) as err: # exception not swallowed
1083 b.close()
1084 self.assertIn('non_existing_close', str(err.exception))
1085 self.assertIsInstance(err.exception.__context__, NameError)
1086 self.assertIn('non_existing_flush', str(err.exception.__context__))
1087 self.assertFalse(b.closed)
1088
Antoine Pitrou6be88762010-05-03 16:48:20 +00001089 def test_multi_close(self):
1090 raw = self.MockRawIO()
1091 b = self.tp(raw)
1092 b.close()
1093 b.close()
1094 b.close()
1095 self.assertRaises(ValueError, b.flush)
1096
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001097 def test_unseekable(self):
1098 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1099 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1100 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1101
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001102 def test_readonly_attributes(self):
1103 raw = self.MockRawIO()
1104 buf = self.tp(raw)
1105 x = self.MockRawIO()
1106 with self.assertRaises(AttributeError):
1107 buf.raw = x
1108
Guido van Rossum78892e42007-04-06 17:31:18 +00001109
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001110class SizeofTest:
1111
1112 @support.cpython_only
1113 def test_sizeof(self):
1114 bufsize1 = 4096
1115 bufsize2 = 8192
1116 rawio = self.MockRawIO()
1117 bufio = self.tp(rawio, buffer_size=bufsize1)
1118 size = sys.getsizeof(bufio) - bufsize1
1119 rawio = self.MockRawIO()
1120 bufio = self.tp(rawio, buffer_size=bufsize2)
1121 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1122
Jesus Ceadc469452012-10-04 12:37:56 +02001123 @support.cpython_only
1124 def test_buffer_freeing(self) :
1125 bufsize = 4096
1126 rawio = self.MockRawIO()
1127 bufio = self.tp(rawio, buffer_size=bufsize)
1128 size = sys.getsizeof(bufio) - bufsize
1129 bufio.close()
1130 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001131
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001132class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1133 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001134
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001135 def test_constructor(self):
1136 rawio = self.MockRawIO([b"abc"])
1137 bufio = self.tp(rawio)
1138 bufio.__init__(rawio)
1139 bufio.__init__(rawio, buffer_size=1024)
1140 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001141 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001142 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1143 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1144 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1145 rawio = self.MockRawIO([b"abc"])
1146 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001147 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001148
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001149 def test_uninitialized(self):
1150 bufio = self.tp.__new__(self.tp)
1151 del bufio
1152 bufio = self.tp.__new__(self.tp)
1153 self.assertRaisesRegex((ValueError, AttributeError),
1154 'uninitialized|has no attribute',
1155 bufio.read, 0)
1156 bufio.__init__(self.MockRawIO())
1157 self.assertEqual(bufio.read(0), b'')
1158
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001159 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001160 for arg in (None, 7):
1161 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1162 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001163 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001164 # Invalid args
1165 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001167 def test_read1(self):
1168 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1169 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001170 self.assertEqual(b"a", bufio.read(1))
1171 self.assertEqual(b"b", bufio.read1(1))
1172 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001173 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001174 self.assertEqual(b"c", bufio.read1(100))
1175 self.assertEqual(rawio._reads, 1)
1176 self.assertEqual(b"d", bufio.read1(100))
1177 self.assertEqual(rawio._reads, 2)
1178 self.assertEqual(b"efg", bufio.read1(100))
1179 self.assertEqual(rawio._reads, 3)
1180 self.assertEqual(b"", bufio.read1(100))
1181 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001182
1183 def test_read1_arbitrary(self):
1184 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1185 bufio = self.tp(rawio)
1186 self.assertEqual(b"a", bufio.read(1))
1187 self.assertEqual(b"bc", bufio.read1())
1188 self.assertEqual(b"d", bufio.read1())
1189 self.assertEqual(b"efg", bufio.read1(-1))
1190 self.assertEqual(rawio._reads, 3)
1191 self.assertEqual(b"", bufio.read1())
1192 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001193
1194 def test_readinto(self):
1195 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1196 bufio = self.tp(rawio)
1197 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001198 self.assertEqual(bufio.readinto(b), 2)
1199 self.assertEqual(b, b"ab")
1200 self.assertEqual(bufio.readinto(b), 2)
1201 self.assertEqual(b, b"cd")
1202 self.assertEqual(bufio.readinto(b), 2)
1203 self.assertEqual(b, b"ef")
1204 self.assertEqual(bufio.readinto(b), 1)
1205 self.assertEqual(b, b"gf")
1206 self.assertEqual(bufio.readinto(b), 0)
1207 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001208 rawio = self.MockRawIO((b"abc", None))
1209 bufio = self.tp(rawio)
1210 self.assertEqual(bufio.readinto(b), 2)
1211 self.assertEqual(b, b"ab")
1212 self.assertEqual(bufio.readinto(b), 1)
1213 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001214
Benjamin Petersona96fea02014-06-22 14:17:44 -07001215 def test_readinto1(self):
1216 buffer_size = 10
1217 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1218 bufio = self.tp(rawio, buffer_size=buffer_size)
1219 b = bytearray(2)
1220 self.assertEqual(bufio.peek(3), b'abc')
1221 self.assertEqual(rawio._reads, 1)
1222 self.assertEqual(bufio.readinto1(b), 2)
1223 self.assertEqual(b, b"ab")
1224 self.assertEqual(rawio._reads, 1)
1225 self.assertEqual(bufio.readinto1(b), 1)
1226 self.assertEqual(b[:1], b"c")
1227 self.assertEqual(rawio._reads, 1)
1228 self.assertEqual(bufio.readinto1(b), 2)
1229 self.assertEqual(b, b"de")
1230 self.assertEqual(rawio._reads, 2)
1231 b = bytearray(2*buffer_size)
1232 self.assertEqual(bufio.peek(3), b'fgh')
1233 self.assertEqual(rawio._reads, 3)
1234 self.assertEqual(bufio.readinto1(b), 6)
1235 self.assertEqual(b[:6], b"fghjkl")
1236 self.assertEqual(rawio._reads, 4)
1237
1238 def test_readinto_array(self):
1239 buffer_size = 60
1240 data = b"a" * 26
1241 rawio = self.MockRawIO((data,))
1242 bufio = self.tp(rawio, buffer_size=buffer_size)
1243
1244 # Create an array with element size > 1 byte
1245 b = array.array('i', b'x' * 32)
1246 assert len(b) != 16
1247
1248 # Read into it. We should get as many *bytes* as we can fit into b
1249 # (which is more than the number of elements)
1250 n = bufio.readinto(b)
1251 self.assertGreater(n, len(b))
1252
1253 # Check that old contents of b are preserved
1254 bm = memoryview(b).cast('B')
1255 self.assertLess(n, len(bm))
1256 self.assertEqual(bm[:n], data[:n])
1257 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1258
1259 def test_readinto1_array(self):
1260 buffer_size = 60
1261 data = b"a" * 26
1262 rawio = self.MockRawIO((data,))
1263 bufio = self.tp(rawio, buffer_size=buffer_size)
1264
1265 # Create an array with element size > 1 byte
1266 b = array.array('i', b'x' * 32)
1267 assert len(b) != 16
1268
1269 # Read into it. We should get as many *bytes* as we can fit into b
1270 # (which is more than the number of elements)
1271 n = bufio.readinto1(b)
1272 self.assertGreater(n, len(b))
1273
1274 # Check that old contents of b are preserved
1275 bm = memoryview(b).cast('B')
1276 self.assertLess(n, len(bm))
1277 self.assertEqual(bm[:n], data[:n])
1278 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1279
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001280 def test_readlines(self):
1281 def bufio():
1282 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1283 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001284 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1285 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1286 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001287
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001288 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001289 data = b"abcdefghi"
1290 dlen = len(data)
1291
1292 tests = [
1293 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1294 [ 100, [ 3, 3, 3], [ dlen ] ],
1295 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1296 ]
1297
1298 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001299 rawio = self.MockFileIO(data)
1300 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001301 pos = 0
1302 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001303 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001304 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001305 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001306 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001307
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001308 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001309 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001310 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1311 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001312 self.assertEqual(b"abcd", bufio.read(6))
1313 self.assertEqual(b"e", bufio.read(1))
1314 self.assertEqual(b"fg", bufio.read())
1315 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001316 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001317 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001318
Victor Stinnera80987f2011-05-25 22:47:16 +02001319 rawio = self.MockRawIO((b"a", None, None))
1320 self.assertEqual(b"a", rawio.readall())
1321 self.assertIsNone(rawio.readall())
1322
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001323 def test_read_past_eof(self):
1324 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1325 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001326
Ezio Melottib3aedd42010-11-20 19:04:17 +00001327 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001328
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001329 def test_read_all(self):
1330 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1331 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001332
Ezio Melottib3aedd42010-11-20 19:04:17 +00001333 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001334
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001335 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001336 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001337 try:
1338 # Write out many bytes with exactly the same number of 0's,
1339 # 1's... 255's. This will help us check that concurrent reading
1340 # doesn't duplicate or forget contents.
1341 N = 1000
1342 l = list(range(256)) * N
1343 random.shuffle(l)
1344 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001345 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001346 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001347 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001348 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001349 errors = []
1350 results = []
1351 def f():
1352 try:
1353 # Intra-buffer read then buffer-flushing read
1354 for n in cycle([1, 19]):
1355 s = bufio.read(n)
1356 if not s:
1357 break
1358 # list.append() is atomic
1359 results.append(s)
1360 except Exception as e:
1361 errors.append(e)
1362 raise
1363 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001364 with support.start_threads(threads):
1365 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001366 self.assertFalse(errors,
1367 "the following exceptions were caught: %r" % errors)
1368 s = b''.join(results)
1369 for i in range(256):
1370 c = bytes(bytearray([i]))
1371 self.assertEqual(s.count(c), N)
1372 finally:
1373 support.unlink(support.TESTFN)
1374
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001375 def test_unseekable(self):
1376 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1377 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1378 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1379 bufio.read(1)
1380 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1381 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1382
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001383 def test_misbehaved_io(self):
1384 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1385 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001386 self.assertRaises(OSError, bufio.seek, 0)
1387 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001388
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001389 def test_no_extraneous_read(self):
1390 # Issue #9550; when the raw IO object has satisfied the read request,
1391 # we should not issue any additional reads, otherwise it may block
1392 # (e.g. socket).
1393 bufsize = 16
1394 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1395 rawio = self.MockRawIO([b"x" * n])
1396 bufio = self.tp(rawio, bufsize)
1397 self.assertEqual(bufio.read(n), b"x" * n)
1398 # Simple case: one raw read is enough to satisfy the request.
1399 self.assertEqual(rawio._extraneous_reads, 0,
1400 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1401 # A more complex case where two raw reads are needed to satisfy
1402 # the request.
1403 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1404 bufio = self.tp(rawio, bufsize)
1405 self.assertEqual(bufio.read(n), b"x" * n)
1406 self.assertEqual(rawio._extraneous_reads, 0,
1407 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1408
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001409 def test_read_on_closed(self):
1410 # Issue #23796
1411 b = io.BufferedReader(io.BytesIO(b"12"))
1412 b.read(1)
1413 b.close()
1414 self.assertRaises(ValueError, b.peek)
1415 self.assertRaises(ValueError, b.read1, 1)
1416
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001417
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001418class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001419 tp = io.BufferedReader
1420
1421 def test_constructor(self):
1422 BufferedReaderTest.test_constructor(self)
1423 # The allocation can succeed on 32-bit builds, e.g. with more
1424 # than 2GB RAM and a 64-bit kernel.
1425 if sys.maxsize > 0x7FFFFFFF:
1426 rawio = self.MockRawIO()
1427 bufio = self.tp(rawio)
1428 self.assertRaises((OverflowError, MemoryError, ValueError),
1429 bufio.__init__, rawio, sys.maxsize)
1430
1431 def test_initialization(self):
1432 rawio = self.MockRawIO([b"abc"])
1433 bufio = self.tp(rawio)
1434 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1435 self.assertRaises(ValueError, bufio.read)
1436 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1437 self.assertRaises(ValueError, bufio.read)
1438 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1439 self.assertRaises(ValueError, bufio.read)
1440
1441 def test_misbehaved_io_read(self):
1442 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1443 bufio = self.tp(rawio)
1444 # _pyio.BufferedReader seems to implement reading different, so that
1445 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001446 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001447
1448 def test_garbage_collection(self):
1449 # C BufferedReader objects are collected.
1450 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001451 with support.check_warnings(('', ResourceWarning)):
1452 rawio = self.FileIO(support.TESTFN, "w+b")
1453 f = self.tp(rawio)
1454 f.f = f
1455 wr = weakref.ref(f)
1456 del f
1457 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001458 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001459
R David Murray67bfe802013-02-23 21:51:05 -05001460 def test_args_error(self):
1461 # Issue #17275
1462 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1463 self.tp(io.BytesIO(), 1024, 1024, 1024)
1464
1465
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001466class PyBufferedReaderTest(BufferedReaderTest):
1467 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001468
Guido van Rossuma9e20242007-03-08 00:43:48 +00001469
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001470class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1471 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001472
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001473 def test_constructor(self):
1474 rawio = self.MockRawIO()
1475 bufio = self.tp(rawio)
1476 bufio.__init__(rawio)
1477 bufio.__init__(rawio, buffer_size=1024)
1478 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001479 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001480 bufio.flush()
1481 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1482 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1483 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1484 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001485 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001486 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001487 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001488
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001489 def test_uninitialized(self):
1490 bufio = self.tp.__new__(self.tp)
1491 del bufio
1492 bufio = self.tp.__new__(self.tp)
1493 self.assertRaisesRegex((ValueError, AttributeError),
1494 'uninitialized|has no attribute',
1495 bufio.write, b'')
1496 bufio.__init__(self.MockRawIO())
1497 self.assertEqual(bufio.write(b''), 0)
1498
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001499 def test_detach_flush(self):
1500 raw = self.MockRawIO()
1501 buf = self.tp(raw)
1502 buf.write(b"howdy!")
1503 self.assertFalse(raw._write_stack)
1504 buf.detach()
1505 self.assertEqual(raw._write_stack, [b"howdy!"])
1506
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001507 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001508 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001509 writer = self.MockRawIO()
1510 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001511 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001512 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001513 buffer = bytearray(b"def")
1514 bufio.write(buffer)
1515 buffer[:] = b"***" # Overwrite our copy of the data
1516 bufio.flush()
1517 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001518
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001519 def test_write_overflow(self):
1520 writer = self.MockRawIO()
1521 bufio = self.tp(writer, 8)
1522 contents = b"abcdefghijklmnop"
1523 for n in range(0, len(contents), 3):
1524 bufio.write(contents[n:n+3])
1525 flushed = b"".join(writer._write_stack)
1526 # At least (total - 8) bytes were implicitly flushed, perhaps more
1527 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001528 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001529
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001530 def check_writes(self, intermediate_func):
1531 # Lots of writes, test the flushed output is as expected.
1532 contents = bytes(range(256)) * 1000
1533 n = 0
1534 writer = self.MockRawIO()
1535 bufio = self.tp(writer, 13)
1536 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1537 def gen_sizes():
1538 for size in count(1):
1539 for i in range(15):
1540 yield size
1541 sizes = gen_sizes()
1542 while n < len(contents):
1543 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001544 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001545 intermediate_func(bufio)
1546 n += size
1547 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001548 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001549
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001550 def test_writes(self):
1551 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001552
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001553 def test_writes_and_flushes(self):
1554 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001555
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001556 def test_writes_and_seeks(self):
1557 def _seekabs(bufio):
1558 pos = bufio.tell()
1559 bufio.seek(pos + 1, 0)
1560 bufio.seek(pos - 1, 0)
1561 bufio.seek(pos, 0)
1562 self.check_writes(_seekabs)
1563 def _seekrel(bufio):
1564 pos = bufio.seek(0, 1)
1565 bufio.seek(+1, 1)
1566 bufio.seek(-1, 1)
1567 bufio.seek(pos, 0)
1568 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001569
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001570 def test_writes_and_truncates(self):
1571 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001572
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001573 def test_write_non_blocking(self):
1574 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001575 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001576
Ezio Melottib3aedd42010-11-20 19:04:17 +00001577 self.assertEqual(bufio.write(b"abcd"), 4)
1578 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001579 # 1 byte will be written, the rest will be buffered
1580 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001581 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001582
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001583 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1584 raw.block_on(b"0")
1585 try:
1586 bufio.write(b"opqrwxyz0123456789")
1587 except self.BlockingIOError as e:
1588 written = e.characters_written
1589 else:
1590 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001591 self.assertEqual(written, 16)
1592 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001593 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001594
Ezio Melottib3aedd42010-11-20 19:04:17 +00001595 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001596 s = raw.pop_written()
1597 # Previously buffered bytes were flushed
1598 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001599
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001600 def test_write_and_rewind(self):
1601 raw = io.BytesIO()
1602 bufio = self.tp(raw, 4)
1603 self.assertEqual(bufio.write(b"abcdef"), 6)
1604 self.assertEqual(bufio.tell(), 6)
1605 bufio.seek(0, 0)
1606 self.assertEqual(bufio.write(b"XY"), 2)
1607 bufio.seek(6, 0)
1608 self.assertEqual(raw.getvalue(), b"XYcdef")
1609 self.assertEqual(bufio.write(b"123456"), 6)
1610 bufio.flush()
1611 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001612
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001613 def test_flush(self):
1614 writer = self.MockRawIO()
1615 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001616 bufio.write(b"abc")
1617 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001618 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001619
Antoine Pitrou131a4892012-10-16 22:57:11 +02001620 def test_writelines(self):
1621 l = [b'ab', b'cd', b'ef']
1622 writer = self.MockRawIO()
1623 bufio = self.tp(writer, 8)
1624 bufio.writelines(l)
1625 bufio.flush()
1626 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1627
1628 def test_writelines_userlist(self):
1629 l = UserList([b'ab', b'cd', b'ef'])
1630 writer = self.MockRawIO()
1631 bufio = self.tp(writer, 8)
1632 bufio.writelines(l)
1633 bufio.flush()
1634 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1635
1636 def test_writelines_error(self):
1637 writer = self.MockRawIO()
1638 bufio = self.tp(writer, 8)
1639 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1640 self.assertRaises(TypeError, bufio.writelines, None)
1641 self.assertRaises(TypeError, bufio.writelines, 'abc')
1642
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001643 def test_destructor(self):
1644 writer = self.MockRawIO()
1645 bufio = self.tp(writer, 8)
1646 bufio.write(b"abc")
1647 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001648 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001649 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001650
1651 def test_truncate(self):
1652 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001653 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001654 bufio = self.tp(raw, 8)
1655 bufio.write(b"abcdef")
1656 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001657 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001658 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001659 self.assertEqual(f.read(), b"abc")
1660
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001661 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001662 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001663 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001664 # Write out many bytes from many threads and test they were
1665 # all flushed.
1666 N = 1000
1667 contents = bytes(range(256)) * N
1668 sizes = cycle([1, 19])
1669 n = 0
1670 queue = deque()
1671 while n < len(contents):
1672 size = next(sizes)
1673 queue.append(contents[n:n+size])
1674 n += size
1675 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001676 # We use a real file object because it allows us to
1677 # exercise situations where the GIL is released before
1678 # writing the buffer to the raw streams. This is in addition
1679 # to concurrency issues due to switching threads in the middle
1680 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001681 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001682 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001683 errors = []
1684 def f():
1685 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001686 while True:
1687 try:
1688 s = queue.popleft()
1689 except IndexError:
1690 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001691 bufio.write(s)
1692 except Exception as e:
1693 errors.append(e)
1694 raise
1695 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001696 with support.start_threads(threads):
1697 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001698 self.assertFalse(errors,
1699 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001700 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001701 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001702 s = f.read()
1703 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001704 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001705 finally:
1706 support.unlink(support.TESTFN)
1707
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001708 def test_misbehaved_io(self):
1709 rawio = self.MisbehavedRawIO()
1710 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001711 self.assertRaises(OSError, bufio.seek, 0)
1712 self.assertRaises(OSError, bufio.tell)
1713 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001714
Florent Xicluna109d5732012-07-07 17:03:22 +02001715 def test_max_buffer_size_removal(self):
1716 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001717 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001718
Benjamin Peterson68623612012-12-20 11:53:11 -06001719 def test_write_error_on_close(self):
1720 raw = self.MockRawIO()
1721 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001722 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001723 raw.write = bad_write
1724 b = self.tp(raw)
1725 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001726 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001727 self.assertTrue(b.closed)
1728
Benjamin Peterson59406a92009-03-26 17:10:29 +00001729
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001730class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001731 tp = io.BufferedWriter
1732
1733 def test_constructor(self):
1734 BufferedWriterTest.test_constructor(self)
1735 # The allocation can succeed on 32-bit builds, e.g. with more
1736 # than 2GB RAM and a 64-bit kernel.
1737 if sys.maxsize > 0x7FFFFFFF:
1738 rawio = self.MockRawIO()
1739 bufio = self.tp(rawio)
1740 self.assertRaises((OverflowError, MemoryError, ValueError),
1741 bufio.__init__, rawio, sys.maxsize)
1742
1743 def test_initialization(self):
1744 rawio = self.MockRawIO()
1745 bufio = self.tp(rawio)
1746 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1747 self.assertRaises(ValueError, bufio.write, b"def")
1748 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1749 self.assertRaises(ValueError, bufio.write, b"def")
1750 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1751 self.assertRaises(ValueError, bufio.write, b"def")
1752
1753 def test_garbage_collection(self):
1754 # C BufferedWriter objects are collected, and collecting them flushes
1755 # all data to disk.
1756 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001757 with support.check_warnings(('', ResourceWarning)):
1758 rawio = self.FileIO(support.TESTFN, "w+b")
1759 f = self.tp(rawio)
1760 f.write(b"123xxx")
1761 f.x = f
1762 wr = weakref.ref(f)
1763 del f
1764 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001765 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001766 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001767 self.assertEqual(f.read(), b"123xxx")
1768
R David Murray67bfe802013-02-23 21:51:05 -05001769 def test_args_error(self):
1770 # Issue #17275
1771 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1772 self.tp(io.BytesIO(), 1024, 1024, 1024)
1773
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001774
1775class PyBufferedWriterTest(BufferedWriterTest):
1776 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001777
Guido van Rossum01a27522007-03-07 01:00:12 +00001778class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001779
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001780 def test_constructor(self):
1781 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001782 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001783
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001784 def test_uninitialized(self):
1785 pair = self.tp.__new__(self.tp)
1786 del pair
1787 pair = self.tp.__new__(self.tp)
1788 self.assertRaisesRegex((ValueError, AttributeError),
1789 'uninitialized|has no attribute',
1790 pair.read, 0)
1791 self.assertRaisesRegex((ValueError, AttributeError),
1792 'uninitialized|has no attribute',
1793 pair.write, b'')
1794 pair.__init__(self.MockRawIO(), self.MockRawIO())
1795 self.assertEqual(pair.read(0), b'')
1796 self.assertEqual(pair.write(b''), 0)
1797
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001798 def test_detach(self):
1799 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1800 self.assertRaises(self.UnsupportedOperation, pair.detach)
1801
Florent Xicluna109d5732012-07-07 17:03:22 +02001802 def test_constructor_max_buffer_size_removal(self):
1803 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001804 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001805
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001806 def test_constructor_with_not_readable(self):
1807 class NotReadable(MockRawIO):
1808 def readable(self):
1809 return False
1810
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001811 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001812
1813 def test_constructor_with_not_writeable(self):
1814 class NotWriteable(MockRawIO):
1815 def writable(self):
1816 return False
1817
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001818 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001819
1820 def test_read(self):
1821 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1822
1823 self.assertEqual(pair.read(3), b"abc")
1824 self.assertEqual(pair.read(1), b"d")
1825 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001826 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1827 self.assertEqual(pair.read(None), b"abc")
1828
1829 def test_readlines(self):
1830 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1831 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1832 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1833 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001834
1835 def test_read1(self):
1836 # .read1() is delegated to the underlying reader object, so this test
1837 # can be shallow.
1838 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1839
1840 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00001841 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001842
1843 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001844 for method in ("readinto", "readinto1"):
1845 with self.subTest(method):
1846 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001847
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03001848 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00001849 self.assertEqual(getattr(pair, method)(data), 5)
1850 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001851
1852 def test_write(self):
1853 w = self.MockRawIO()
1854 pair = self.tp(self.MockRawIO(), w)
1855
1856 pair.write(b"abc")
1857 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00001858 buffer = bytearray(b"def")
1859 pair.write(buffer)
1860 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001861 pair.flush()
1862 self.assertEqual(w._write_stack, [b"abc", b"def"])
1863
1864 def test_peek(self):
1865 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1866
1867 self.assertTrue(pair.peek(3).startswith(b"abc"))
1868 self.assertEqual(pair.read(3), b"abc")
1869
1870 def test_readable(self):
1871 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1872 self.assertTrue(pair.readable())
1873
1874 def test_writeable(self):
1875 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1876 self.assertTrue(pair.writable())
1877
1878 def test_seekable(self):
1879 # BufferedRWPairs are never seekable, even if their readers and writers
1880 # are.
1881 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1882 self.assertFalse(pair.seekable())
1883
1884 # .flush() is delegated to the underlying writer object and has been
1885 # tested in the test_write method.
1886
1887 def test_close_and_closed(self):
1888 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1889 self.assertFalse(pair.closed)
1890 pair.close()
1891 self.assertTrue(pair.closed)
1892
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001893 def test_reader_close_error_on_close(self):
1894 def reader_close():
1895 reader_non_existing
1896 reader = self.MockRawIO()
1897 reader.close = reader_close
1898 writer = self.MockRawIO()
1899 pair = self.tp(reader, writer)
1900 with self.assertRaises(NameError) as err:
1901 pair.close()
1902 self.assertIn('reader_non_existing', str(err.exception))
1903 self.assertTrue(pair.closed)
1904 self.assertFalse(reader.closed)
1905 self.assertTrue(writer.closed)
1906
1907 def test_writer_close_error_on_close(self):
1908 def writer_close():
1909 writer_non_existing
1910 reader = self.MockRawIO()
1911 writer = self.MockRawIO()
1912 writer.close = writer_close
1913 pair = self.tp(reader, writer)
1914 with self.assertRaises(NameError) as err:
1915 pair.close()
1916 self.assertIn('writer_non_existing', str(err.exception))
1917 self.assertFalse(pair.closed)
1918 self.assertTrue(reader.closed)
1919 self.assertFalse(writer.closed)
1920
1921 def test_reader_writer_close_error_on_close(self):
1922 def reader_close():
1923 reader_non_existing
1924 def writer_close():
1925 writer_non_existing
1926 reader = self.MockRawIO()
1927 reader.close = reader_close
1928 writer = self.MockRawIO()
1929 writer.close = writer_close
1930 pair = self.tp(reader, writer)
1931 with self.assertRaises(NameError) as err:
1932 pair.close()
1933 self.assertIn('reader_non_existing', str(err.exception))
1934 self.assertIsInstance(err.exception.__context__, NameError)
1935 self.assertIn('writer_non_existing', str(err.exception.__context__))
1936 self.assertFalse(pair.closed)
1937 self.assertFalse(reader.closed)
1938 self.assertFalse(writer.closed)
1939
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001940 def test_isatty(self):
1941 class SelectableIsAtty(MockRawIO):
1942 def __init__(self, isatty):
1943 MockRawIO.__init__(self)
1944 self._isatty = isatty
1945
1946 def isatty(self):
1947 return self._isatty
1948
1949 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1950 self.assertFalse(pair.isatty())
1951
1952 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1953 self.assertTrue(pair.isatty())
1954
1955 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1956 self.assertTrue(pair.isatty())
1957
1958 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1959 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001960
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001961 def test_weakref_clearing(self):
1962 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1963 ref = weakref.ref(brw)
1964 brw = None
1965 ref = None # Shouldn't segfault.
1966
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001967class CBufferedRWPairTest(BufferedRWPairTest):
1968 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001969
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001970class PyBufferedRWPairTest(BufferedRWPairTest):
1971 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001972
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001973
1974class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1975 read_mode = "rb+"
1976 write_mode = "wb+"
1977
1978 def test_constructor(self):
1979 BufferedReaderTest.test_constructor(self)
1980 BufferedWriterTest.test_constructor(self)
1981
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001982 def test_uninitialized(self):
1983 BufferedReaderTest.test_uninitialized(self)
1984 BufferedWriterTest.test_uninitialized(self)
1985
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001986 def test_read_and_write(self):
1987 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001988 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001989
1990 self.assertEqual(b"as", rw.read(2))
1991 rw.write(b"ddd")
1992 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001993 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001994 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001995 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001996
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001997 def test_seek_and_tell(self):
1998 raw = self.BytesIO(b"asdfghjkl")
1999 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002000
Ezio Melottib3aedd42010-11-20 19:04:17 +00002001 self.assertEqual(b"as", rw.read(2))
2002 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002003 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002004 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002005
Antoine Pitroue05565e2011-08-20 14:39:23 +02002006 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002007 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002008 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002009 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002010 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002011 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002012 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002013 self.assertEqual(7, rw.tell())
2014 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002015 rw.flush()
2016 self.assertEqual(b"asdf123fl", raw.getvalue())
2017
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002018 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002019
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002020 def check_flush_and_read(self, read_func):
2021 raw = self.BytesIO(b"abcdefghi")
2022 bufio = self.tp(raw)
2023
Ezio Melottib3aedd42010-11-20 19:04:17 +00002024 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002025 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002026 self.assertEqual(b"ef", read_func(bufio, 2))
2027 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002028 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002029 self.assertEqual(6, bufio.tell())
2030 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002031 raw.seek(0, 0)
2032 raw.write(b"XYZ")
2033 # flush() resets the read buffer
2034 bufio.flush()
2035 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002036 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002037
2038 def test_flush_and_read(self):
2039 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2040
2041 def test_flush_and_readinto(self):
2042 def _readinto(bufio, n=-1):
2043 b = bytearray(n if n >= 0 else 9999)
2044 n = bufio.readinto(b)
2045 return bytes(b[:n])
2046 self.check_flush_and_read(_readinto)
2047
2048 def test_flush_and_peek(self):
2049 def _peek(bufio, n=-1):
2050 # This relies on the fact that the buffer can contain the whole
2051 # raw stream, otherwise peek() can return less.
2052 b = bufio.peek(n)
2053 if n != -1:
2054 b = b[:n]
2055 bufio.seek(len(b), 1)
2056 return b
2057 self.check_flush_and_read(_peek)
2058
2059 def test_flush_and_write(self):
2060 raw = self.BytesIO(b"abcdefghi")
2061 bufio = self.tp(raw)
2062
2063 bufio.write(b"123")
2064 bufio.flush()
2065 bufio.write(b"45")
2066 bufio.flush()
2067 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002068 self.assertEqual(b"12345fghi", raw.getvalue())
2069 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002070
2071 def test_threads(self):
2072 BufferedReaderTest.test_threads(self)
2073 BufferedWriterTest.test_threads(self)
2074
2075 def test_writes_and_peek(self):
2076 def _peek(bufio):
2077 bufio.peek(1)
2078 self.check_writes(_peek)
2079 def _peek(bufio):
2080 pos = bufio.tell()
2081 bufio.seek(-1, 1)
2082 bufio.peek(1)
2083 bufio.seek(pos, 0)
2084 self.check_writes(_peek)
2085
2086 def test_writes_and_reads(self):
2087 def _read(bufio):
2088 bufio.seek(-1, 1)
2089 bufio.read(1)
2090 self.check_writes(_read)
2091
2092 def test_writes_and_read1s(self):
2093 def _read1(bufio):
2094 bufio.seek(-1, 1)
2095 bufio.read1(1)
2096 self.check_writes(_read1)
2097
2098 def test_writes_and_readintos(self):
2099 def _read(bufio):
2100 bufio.seek(-1, 1)
2101 bufio.readinto(bytearray(1))
2102 self.check_writes(_read)
2103
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002104 def test_write_after_readahead(self):
2105 # Issue #6629: writing after the buffer was filled by readahead should
2106 # first rewind the raw stream.
2107 for overwrite_size in [1, 5]:
2108 raw = self.BytesIO(b"A" * 10)
2109 bufio = self.tp(raw, 4)
2110 # Trigger readahead
2111 self.assertEqual(bufio.read(1), b"A")
2112 self.assertEqual(bufio.tell(), 1)
2113 # Overwriting should rewind the raw stream if it needs so
2114 bufio.write(b"B" * overwrite_size)
2115 self.assertEqual(bufio.tell(), overwrite_size + 1)
2116 # If the write size was smaller than the buffer size, flush() and
2117 # check that rewind happens.
2118 bufio.flush()
2119 self.assertEqual(bufio.tell(), overwrite_size + 1)
2120 s = raw.getvalue()
2121 self.assertEqual(s,
2122 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2123
Antoine Pitrou7c404892011-05-13 00:13:33 +02002124 def test_write_rewind_write(self):
2125 # Various combinations of reading / writing / seeking backwards / writing again
2126 def mutate(bufio, pos1, pos2):
2127 assert pos2 >= pos1
2128 # Fill the buffer
2129 bufio.seek(pos1)
2130 bufio.read(pos2 - pos1)
2131 bufio.write(b'\x02')
2132 # This writes earlier than the previous write, but still inside
2133 # the buffer.
2134 bufio.seek(pos1)
2135 bufio.write(b'\x01')
2136
2137 b = b"\x80\x81\x82\x83\x84"
2138 for i in range(0, len(b)):
2139 for j in range(i, len(b)):
2140 raw = self.BytesIO(b)
2141 bufio = self.tp(raw, 100)
2142 mutate(bufio, i, j)
2143 bufio.flush()
2144 expected = bytearray(b)
2145 expected[j] = 2
2146 expected[i] = 1
2147 self.assertEqual(raw.getvalue(), expected,
2148 "failed result for i=%d, j=%d" % (i, j))
2149
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002150 def test_truncate_after_read_or_write(self):
2151 raw = self.BytesIO(b"A" * 10)
2152 bufio = self.tp(raw, 100)
2153 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2154 self.assertEqual(bufio.truncate(), 2)
2155 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2156 self.assertEqual(bufio.truncate(), 4)
2157
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002158 def test_misbehaved_io(self):
2159 BufferedReaderTest.test_misbehaved_io(self)
2160 BufferedWriterTest.test_misbehaved_io(self)
2161
Antoine Pitroue05565e2011-08-20 14:39:23 +02002162 def test_interleaved_read_write(self):
2163 # Test for issue #12213
2164 with self.BytesIO(b'abcdefgh') as raw:
2165 with self.tp(raw, 100) as f:
2166 f.write(b"1")
2167 self.assertEqual(f.read(1), b'b')
2168 f.write(b'2')
2169 self.assertEqual(f.read1(1), b'd')
2170 f.write(b'3')
2171 buf = bytearray(1)
2172 f.readinto(buf)
2173 self.assertEqual(buf, b'f')
2174 f.write(b'4')
2175 self.assertEqual(f.peek(1), b'h')
2176 f.flush()
2177 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2178
2179 with self.BytesIO(b'abc') as raw:
2180 with self.tp(raw, 100) as f:
2181 self.assertEqual(f.read(1), b'a')
2182 f.write(b"2")
2183 self.assertEqual(f.read(1), b'c')
2184 f.flush()
2185 self.assertEqual(raw.getvalue(), b'a2c')
2186
2187 def test_interleaved_readline_write(self):
2188 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2189 with self.tp(raw) as f:
2190 f.write(b'1')
2191 self.assertEqual(f.readline(), b'b\n')
2192 f.write(b'2')
2193 self.assertEqual(f.readline(), b'def\n')
2194 f.write(b'3')
2195 self.assertEqual(f.readline(), b'\n')
2196 f.flush()
2197 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2198
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002199 # You can't construct a BufferedRandom over a non-seekable stream.
2200 test_unseekable = None
2201
R David Murray67bfe802013-02-23 21:51:05 -05002202
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002203class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002204 tp = io.BufferedRandom
2205
2206 def test_constructor(self):
2207 BufferedRandomTest.test_constructor(self)
2208 # The allocation can succeed on 32-bit builds, e.g. with more
2209 # than 2GB RAM and a 64-bit kernel.
2210 if sys.maxsize > 0x7FFFFFFF:
2211 rawio = self.MockRawIO()
2212 bufio = self.tp(rawio)
2213 self.assertRaises((OverflowError, MemoryError, ValueError),
2214 bufio.__init__, rawio, sys.maxsize)
2215
2216 def test_garbage_collection(self):
2217 CBufferedReaderTest.test_garbage_collection(self)
2218 CBufferedWriterTest.test_garbage_collection(self)
2219
R David Murray67bfe802013-02-23 21:51:05 -05002220 def test_args_error(self):
2221 # Issue #17275
2222 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2223 self.tp(io.BytesIO(), 1024, 1024, 1024)
2224
2225
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002226class PyBufferedRandomTest(BufferedRandomTest):
2227 tp = pyio.BufferedRandom
2228
2229
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002230# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2231# properties:
2232# - A single output character can correspond to many bytes of input.
2233# - The number of input bytes to complete the character can be
2234# undetermined until the last input byte is received.
2235# - The number of input bytes can vary depending on previous input.
2236# - A single input byte can correspond to many characters of output.
2237# - The number of output characters can be undetermined until the
2238# last input byte is received.
2239# - The number of output characters can vary depending on previous input.
2240
2241class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2242 """
2243 For testing seek/tell behavior with a stateful, buffering decoder.
2244
2245 Input is a sequence of words. Words may be fixed-length (length set
2246 by input) or variable-length (period-terminated). In variable-length
2247 mode, extra periods are ignored. Possible words are:
2248 - 'i' followed by a number sets the input length, I (maximum 99).
2249 When I is set to 0, words are space-terminated.
2250 - 'o' followed by a number sets the output length, O (maximum 99).
2251 - Any other word is converted into a word followed by a period on
2252 the output. The output word consists of the input word truncated
2253 or padded out with hyphens to make its length equal to O. If O
2254 is 0, the word is output verbatim without truncating or padding.
2255 I and O are initially set to 1. When I changes, any buffered input is
2256 re-scanned according to the new I. EOF also terminates the last word.
2257 """
2258
2259 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002260 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002261 self.reset()
2262
2263 def __repr__(self):
2264 return '<SID %x>' % id(self)
2265
2266 def reset(self):
2267 self.i = 1
2268 self.o = 1
2269 self.buffer = bytearray()
2270
2271 def getstate(self):
2272 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2273 return bytes(self.buffer), i*100 + o
2274
2275 def setstate(self, state):
2276 buffer, io = state
2277 self.buffer = bytearray(buffer)
2278 i, o = divmod(io, 100)
2279 self.i, self.o = i ^ 1, o ^ 1
2280
2281 def decode(self, input, final=False):
2282 output = ''
2283 for b in input:
2284 if self.i == 0: # variable-length, terminated with period
2285 if b == ord('.'):
2286 if self.buffer:
2287 output += self.process_word()
2288 else:
2289 self.buffer.append(b)
2290 else: # fixed-length, terminate after self.i bytes
2291 self.buffer.append(b)
2292 if len(self.buffer) == self.i:
2293 output += self.process_word()
2294 if final and self.buffer: # EOF terminates the last word
2295 output += self.process_word()
2296 return output
2297
2298 def process_word(self):
2299 output = ''
2300 if self.buffer[0] == ord('i'):
2301 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2302 elif self.buffer[0] == ord('o'):
2303 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2304 else:
2305 output = self.buffer.decode('ascii')
2306 if len(output) < self.o:
2307 output += '-'*self.o # pad out with hyphens
2308 if self.o:
2309 output = output[:self.o] # truncate to output length
2310 output += '.'
2311 self.buffer = bytearray()
2312 return output
2313
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002314 codecEnabled = False
2315
2316 @classmethod
2317 def lookupTestDecoder(cls, name):
2318 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002319 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002320 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002321 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002322 incrementalencoder=None,
2323 streamreader=None, streamwriter=None,
2324 incrementaldecoder=cls)
2325
2326# Register the previous decoder for testing.
2327# Disabled by default, tests will enable it.
2328codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2329
2330
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002331class StatefulIncrementalDecoderTest(unittest.TestCase):
2332 """
2333 Make sure the StatefulIncrementalDecoder actually works.
2334 """
2335
2336 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002337 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002338 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002339 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002340 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002341 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002342 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002343 # I=0, O=6 (variable-length input, fixed-length output)
2344 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2345 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002346 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002347 # I=6, O=3 (fixed-length input > fixed-length output)
2348 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2349 # I=0, then 3; O=29, then 15 (with longer output)
2350 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2351 'a----------------------------.' +
2352 'b----------------------------.' +
2353 'cde--------------------------.' +
2354 'abcdefghijabcde.' +
2355 'a.b------------.' +
2356 '.c.------------.' +
2357 'd.e------------.' +
2358 'k--------------.' +
2359 'l--------------.' +
2360 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002361 ]
2362
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002363 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002364 # Try a few one-shot test cases.
2365 for input, eof, output in self.test_cases:
2366 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002367 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002368
2369 # Also test an unfinished decode, followed by forcing EOF.
2370 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002371 self.assertEqual(d.decode(b'oiabcd'), '')
2372 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002373
2374class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002375
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002376 def setUp(self):
2377 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2378 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002379 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002380
Guido van Rossumd0712812007-04-11 16:32:43 +00002381 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002382 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002383
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002384 def test_constructor(self):
2385 r = self.BytesIO(b"\xc3\xa9\n\n")
2386 b = self.BufferedReader(r, 1000)
2387 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002388 t.__init__(b, encoding="latin-1", newline="\r\n")
2389 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002390 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002391 t.__init__(b, encoding="utf-8", line_buffering=True)
2392 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002393 self.assertEqual(t.line_buffering, True)
2394 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002395 self.assertRaises(TypeError, t.__init__, b, newline=42)
2396 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2397
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002398 def test_uninitialized(self):
2399 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2400 del t
2401 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2402 self.assertRaises(Exception, repr, t)
2403 self.assertRaisesRegex((ValueError, AttributeError),
2404 'uninitialized|has no attribute',
2405 t.read, 0)
2406 t.__init__(self.MockRawIO())
2407 self.assertEqual(t.read(0), '')
2408
Nick Coghlana9b15242014-02-04 22:11:18 +10002409 def test_non_text_encoding_codecs_are_rejected(self):
2410 # Ensure the constructor complains if passed a codec that isn't
2411 # marked as a text encoding
2412 # http://bugs.python.org/issue20404
2413 r = self.BytesIO()
2414 b = self.BufferedWriter(r)
2415 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2416 self.TextIOWrapper(b, encoding="hex")
2417
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002418 def test_detach(self):
2419 r = self.BytesIO()
2420 b = self.BufferedWriter(r)
2421 t = self.TextIOWrapper(b)
2422 self.assertIs(t.detach(), b)
2423
2424 t = self.TextIOWrapper(b, encoding="ascii")
2425 t.write("howdy")
2426 self.assertFalse(r.getvalue())
2427 t.detach()
2428 self.assertEqual(r.getvalue(), b"howdy")
2429 self.assertRaises(ValueError, t.detach)
2430
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002431 # Operations independent of the detached stream should still work
2432 repr(t)
2433 self.assertEqual(t.encoding, "ascii")
2434 self.assertEqual(t.errors, "strict")
2435 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002436 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002437
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002438 def test_repr(self):
2439 raw = self.BytesIO("hello".encode("utf-8"))
2440 b = self.BufferedReader(raw)
2441 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002442 modname = self.TextIOWrapper.__module__
2443 self.assertEqual(repr(t),
2444 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2445 raw.name = "dummy"
2446 self.assertEqual(repr(t),
2447 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002448 t.mode = "r"
2449 self.assertEqual(repr(t),
2450 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002451 raw.name = b"dummy"
2452 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002453 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002454
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002455 t.buffer.detach()
2456 repr(t) # Should not raise an exception
2457
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002458 def test_recursive_repr(self):
2459 # Issue #25455
2460 raw = self.BytesIO()
2461 t = self.TextIOWrapper(raw)
2462 with support.swap_attr(raw, 'name', t):
2463 try:
2464 repr(t) # Should not crash
2465 except RuntimeError:
2466 pass
2467
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002468 def test_line_buffering(self):
2469 r = self.BytesIO()
2470 b = self.BufferedWriter(r, 1000)
2471 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002472 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002473 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002474 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002475 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002476 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002477 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002478
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002479 def test_reconfigure_line_buffering(self):
2480 r = self.BytesIO()
2481 b = self.BufferedWriter(r, 1000)
2482 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2483 t.write("AB\nC")
2484 self.assertEqual(r.getvalue(), b"")
2485
2486 t.reconfigure(line_buffering=True) # implicit flush
2487 self.assertEqual(r.getvalue(), b"AB\nC")
2488 t.write("DEF\nG")
2489 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2490 t.write("H")
2491 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2492 t.reconfigure(line_buffering=False) # implicit flush
2493 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2494 t.write("IJ")
2495 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2496
2497 # Keeping default value
2498 t.reconfigure()
2499 t.reconfigure(line_buffering=None)
2500 self.assertEqual(t.line_buffering, False)
2501 t.reconfigure(line_buffering=True)
2502 t.reconfigure()
2503 t.reconfigure(line_buffering=None)
2504 self.assertEqual(t.line_buffering, True)
2505
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002506 def test_default_encoding(self):
2507 old_environ = dict(os.environ)
2508 try:
2509 # try to get a user preferred encoding different than the current
2510 # locale encoding to check that TextIOWrapper() uses the current
2511 # locale encoding and not the user preferred encoding
2512 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2513 if key in os.environ:
2514 del os.environ[key]
2515
2516 current_locale_encoding = locale.getpreferredencoding(False)
2517 b = self.BytesIO()
2518 t = self.TextIOWrapper(b)
2519 self.assertEqual(t.encoding, current_locale_encoding)
2520 finally:
2521 os.environ.clear()
2522 os.environ.update(old_environ)
2523
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002524 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002525 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002526 # Issue 15989
2527 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002528 b = self.BytesIO()
2529 b.fileno = lambda: _testcapi.INT_MAX + 1
2530 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2531 b.fileno = lambda: _testcapi.UINT_MAX + 1
2532 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2533
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002534 def test_encoding(self):
2535 # Check the encoding attribute is always set, and valid
2536 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002537 t = self.TextIOWrapper(b, encoding="utf-8")
2538 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002539 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002540 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002541 codecs.lookup(t.encoding)
2542
2543 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002544 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002545 b = self.BytesIO(b"abc\n\xff\n")
2546 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002547 self.assertRaises(UnicodeError, t.read)
2548 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002549 b = self.BytesIO(b"abc\n\xff\n")
2550 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002551 self.assertRaises(UnicodeError, t.read)
2552 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002553 b = self.BytesIO(b"abc\n\xff\n")
2554 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002555 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002556 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002557 b = self.BytesIO(b"abc\n\xff\n")
2558 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002559 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002560
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002561 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002562 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002563 b = self.BytesIO()
2564 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002565 self.assertRaises(UnicodeError, t.write, "\xff")
2566 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002567 b = self.BytesIO()
2568 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002569 self.assertRaises(UnicodeError, t.write, "\xff")
2570 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002571 b = self.BytesIO()
2572 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002573 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002574 t.write("abc\xffdef\n")
2575 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002576 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002577 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002578 b = self.BytesIO()
2579 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002580 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002581 t.write("abc\xffdef\n")
2582 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002583 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002584
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002585 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002586 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2587
2588 tests = [
2589 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002590 [ '', input_lines ],
2591 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2592 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2593 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002594 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002595 encodings = (
2596 'utf-8', 'latin-1',
2597 'utf-16', 'utf-16-le', 'utf-16-be',
2598 'utf-32', 'utf-32-le', 'utf-32-be',
2599 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002600
Guido van Rossum8358db22007-08-18 21:39:55 +00002601 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002602 # character in TextIOWrapper._pending_line.
2603 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002604 # XXX: str.encode() should return bytes
2605 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002606 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002607 for bufsize in range(1, 10):
2608 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002609 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2610 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002611 encoding=encoding)
2612 if do_reads:
2613 got_lines = []
2614 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002615 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002616 if c2 == '':
2617 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002618 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002619 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002620 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002621 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002622
2623 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002624 self.assertEqual(got_line, exp_line)
2625 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002626
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002627 def test_newlines_input(self):
2628 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002629 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2630 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002631 (None, normalized.decode("ascii").splitlines(keepends=True)),
2632 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002633 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2634 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2635 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002636 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002637 buf = self.BytesIO(testdata)
2638 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002639 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002640 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002641 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002642
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002643 def test_newlines_output(self):
2644 testdict = {
2645 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2646 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2647 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2648 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2649 }
2650 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2651 for newline, expected in tests:
2652 buf = self.BytesIO()
2653 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2654 txt.write("AAA\nB")
2655 txt.write("BB\nCCC\n")
2656 txt.write("X\rY\r\nZ")
2657 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002658 self.assertEqual(buf.closed, False)
2659 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002660
2661 def test_destructor(self):
2662 l = []
2663 base = self.BytesIO
2664 class MyBytesIO(base):
2665 def close(self):
2666 l.append(self.getvalue())
2667 base.close(self)
2668 b = MyBytesIO()
2669 t = self.TextIOWrapper(b, encoding="ascii")
2670 t.write("abc")
2671 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002672 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002673 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002674
2675 def test_override_destructor(self):
2676 record = []
2677 class MyTextIO(self.TextIOWrapper):
2678 def __del__(self):
2679 record.append(1)
2680 try:
2681 f = super().__del__
2682 except AttributeError:
2683 pass
2684 else:
2685 f()
2686 def close(self):
2687 record.append(2)
2688 super().close()
2689 def flush(self):
2690 record.append(3)
2691 super().flush()
2692 b = self.BytesIO()
2693 t = MyTextIO(b, encoding="ascii")
2694 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002695 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002696 self.assertEqual(record, [1, 2, 3])
2697
2698 def test_error_through_destructor(self):
2699 # Test that the exception state is not modified by a destructor,
2700 # even if close() fails.
2701 rawio = self.CloseFailureIO()
2702 def f():
2703 self.TextIOWrapper(rawio).xyzzy
2704 with support.captured_output("stderr") as s:
2705 self.assertRaises(AttributeError, f)
2706 s = s.getvalue().strip()
2707 if s:
2708 # The destructor *may* have printed an unraisable error, check it
2709 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002710 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002711 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002712
Guido van Rossum9b76da62007-04-11 01:09:03 +00002713 # Systematic tests of the text I/O API
2714
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002715 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002716 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002717 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002718 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002719 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002720 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002721 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002722 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002723 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002724 self.assertEqual(f.tell(), 0)
2725 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002726 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002727 self.assertEqual(f.seek(0), 0)
2728 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002729 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002730 self.assertEqual(f.read(2), "ab")
2731 self.assertEqual(f.read(1), "c")
2732 self.assertEqual(f.read(1), "")
2733 self.assertEqual(f.read(), "")
2734 self.assertEqual(f.tell(), cookie)
2735 self.assertEqual(f.seek(0), 0)
2736 self.assertEqual(f.seek(0, 2), cookie)
2737 self.assertEqual(f.write("def"), 3)
2738 self.assertEqual(f.seek(cookie), cookie)
2739 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002740 if enc.startswith("utf"):
2741 self.multi_line_test(f, enc)
2742 f.close()
2743
2744 def multi_line_test(self, f, enc):
2745 f.seek(0)
2746 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002747 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002748 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002749 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002750 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002751 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002752 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002753 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002754 wlines.append((f.tell(), line))
2755 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002756 f.seek(0)
2757 rlines = []
2758 while True:
2759 pos = f.tell()
2760 line = f.readline()
2761 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002762 break
2763 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002764 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002765
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002766 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002767 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002768 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002769 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002770 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002771 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002772 p2 = f.tell()
2773 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002774 self.assertEqual(f.tell(), p0)
2775 self.assertEqual(f.readline(), "\xff\n")
2776 self.assertEqual(f.tell(), p1)
2777 self.assertEqual(f.readline(), "\xff\n")
2778 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002779 f.seek(0)
2780 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002781 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002782 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002783 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002784 f.close()
2785
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002786 def test_seeking(self):
2787 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002788 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002789 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002790 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002791 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002792 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002793 suffix = bytes(u_suffix.encode("utf-8"))
2794 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002795 with self.open(support.TESTFN, "wb") as f:
2796 f.write(line*2)
2797 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2798 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002799 self.assertEqual(s, str(prefix, "ascii"))
2800 self.assertEqual(f.tell(), prefix_size)
2801 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002802
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002803 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002804 # Regression test for a specific bug
2805 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002806 with self.open(support.TESTFN, "wb") as f:
2807 f.write(data)
2808 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2809 f._CHUNK_SIZE # Just test that it exists
2810 f._CHUNK_SIZE = 2
2811 f.readline()
2812 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002813
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002814 def test_seek_and_tell(self):
2815 #Test seek/tell using the StatefulIncrementalDecoder.
2816 # Make test faster by doing smaller seeks
2817 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002818
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002819 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002820 """Tell/seek to various points within a data stream and ensure
2821 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002822 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002823 f.write(data)
2824 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002825 f = self.open(support.TESTFN, encoding='test_decoder')
2826 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002827 decoded = f.read()
2828 f.close()
2829
Neal Norwitze2b07052008-03-18 19:52:05 +00002830 for i in range(min_pos, len(decoded) + 1): # seek positions
2831 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002832 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002833 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002834 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002835 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002836 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002837 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002838 f.close()
2839
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002840 # Enable the test decoder.
2841 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002842
2843 # Run the tests.
2844 try:
2845 # Try each test case.
2846 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002847 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002848
2849 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002850 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2851 offset = CHUNK_SIZE - len(input)//2
2852 prefix = b'.'*offset
2853 # Don't bother seeking into the prefix (takes too long).
2854 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002855 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002856
2857 # Ensure our test decoder won't interfere with subsequent tests.
2858 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002859 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002860
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002861 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002862 data = "1234567890"
2863 tests = ("utf-16",
2864 "utf-16-le",
2865 "utf-16-be",
2866 "utf-32",
2867 "utf-32-le",
2868 "utf-32-be")
2869 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002870 buf = self.BytesIO()
2871 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002872 # Check if the BOM is written only once (see issue1753).
2873 f.write(data)
2874 f.write(data)
2875 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002876 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002877 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002878 self.assertEqual(f.read(), data * 2)
2879 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002880
Benjamin Petersona1b49012009-03-31 23:11:32 +00002881 def test_unreadable(self):
2882 class UnReadable(self.BytesIO):
2883 def readable(self):
2884 return False
2885 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002886 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002887
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002888 def test_read_one_by_one(self):
2889 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002890 reads = ""
2891 while True:
2892 c = txt.read(1)
2893 if not c:
2894 break
2895 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002896 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002897
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002898 def test_readlines(self):
2899 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2900 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2901 txt.seek(0)
2902 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2903 txt.seek(0)
2904 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2905
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002906 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002907 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002908 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002909 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002910 reads = ""
2911 while True:
2912 c = txt.read(128)
2913 if not c:
2914 break
2915 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002916 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002917
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002918 def test_writelines(self):
2919 l = ['ab', 'cd', 'ef']
2920 buf = self.BytesIO()
2921 txt = self.TextIOWrapper(buf)
2922 txt.writelines(l)
2923 txt.flush()
2924 self.assertEqual(buf.getvalue(), b'abcdef')
2925
2926 def test_writelines_userlist(self):
2927 l = UserList(['ab', 'cd', 'ef'])
2928 buf = self.BytesIO()
2929 txt = self.TextIOWrapper(buf)
2930 txt.writelines(l)
2931 txt.flush()
2932 self.assertEqual(buf.getvalue(), b'abcdef')
2933
2934 def test_writelines_error(self):
2935 txt = self.TextIOWrapper(self.BytesIO())
2936 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2937 self.assertRaises(TypeError, txt.writelines, None)
2938 self.assertRaises(TypeError, txt.writelines, b'abc')
2939
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002940 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002941 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002942
2943 # read one char at a time
2944 reads = ""
2945 while True:
2946 c = txt.read(1)
2947 if not c:
2948 break
2949 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002950 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002951
2952 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002953 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002954 txt._CHUNK_SIZE = 4
2955
2956 reads = ""
2957 while True:
2958 c = txt.read(4)
2959 if not c:
2960 break
2961 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002962 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002963
2964 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002965 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002966 txt._CHUNK_SIZE = 4
2967
2968 reads = txt.read(4)
2969 reads += txt.read(4)
2970 reads += txt.readline()
2971 reads += txt.readline()
2972 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002973 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002974
2975 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002976 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002977 txt._CHUNK_SIZE = 4
2978
2979 reads = txt.read(4)
2980 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002981 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002982
2983 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002984 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002985 txt._CHUNK_SIZE = 4
2986
2987 reads = txt.read(4)
2988 pos = txt.tell()
2989 txt.seek(0)
2990 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002991 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002992
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002993 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002994 buffer = self.BytesIO(self.testdata)
2995 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002996
2997 self.assertEqual(buffer.seekable(), txt.seekable())
2998
Antoine Pitroue4501852009-05-14 18:55:55 +00002999 def test_append_bom(self):
3000 # The BOM is not written again when appending to a non-empty file
3001 filename = support.TESTFN
3002 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3003 with self.open(filename, 'w', encoding=charset) as f:
3004 f.write('aaa')
3005 pos = f.tell()
3006 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003007 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003008
3009 with self.open(filename, 'a', encoding=charset) as f:
3010 f.write('xxx')
3011 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003012 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003013
3014 def test_seek_bom(self):
3015 # Same test, but when seeking manually
3016 filename = support.TESTFN
3017 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3018 with self.open(filename, 'w', encoding=charset) as f:
3019 f.write('aaa')
3020 pos = f.tell()
3021 with self.open(filename, 'r+', encoding=charset) as f:
3022 f.seek(pos)
3023 f.write('zzz')
3024 f.seek(0)
3025 f.write('bbb')
3026 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003027 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003028
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003029 def test_seek_append_bom(self):
3030 # Same test, but first seek to the start and then to the end
3031 filename = support.TESTFN
3032 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3033 with self.open(filename, 'w', encoding=charset) as f:
3034 f.write('aaa')
3035 with self.open(filename, 'a', encoding=charset) as f:
3036 f.seek(0)
3037 f.seek(0, self.SEEK_END)
3038 f.write('xxx')
3039 with self.open(filename, 'rb') as f:
3040 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3041
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003042 def test_errors_property(self):
3043 with self.open(support.TESTFN, "w") as f:
3044 self.assertEqual(f.errors, "strict")
3045 with self.open(support.TESTFN, "w", errors="replace") as f:
3046 self.assertEqual(f.errors, "replace")
3047
Brett Cannon31f59292011-02-21 19:29:56 +00003048 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003049 def test_threads_write(self):
3050 # Issue6750: concurrent writes could duplicate data
3051 event = threading.Event()
3052 with self.open(support.TESTFN, "w", buffering=1) as f:
3053 def run(n):
3054 text = "Thread%03d\n" % n
3055 event.wait()
3056 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003057 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003058 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003059 with support.start_threads(threads, event.set):
3060 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003061 with self.open(support.TESTFN) as f:
3062 content = f.read()
3063 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003064 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003065
Antoine Pitrou6be88762010-05-03 16:48:20 +00003066 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003067 # Test that text file is closed despite failed flush
3068 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003069 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003070 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003071 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003072 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003073 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003074 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003075 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003076 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003077 self.assertTrue(txt.buffer.closed)
3078 self.assertTrue(closed) # flush() called
3079 self.assertFalse(closed[0]) # flush() called before file closed
3080 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003081 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003082
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003083 def test_close_error_on_close(self):
3084 buffer = self.BytesIO(self.testdata)
3085 def bad_flush():
3086 raise OSError('flush')
3087 def bad_close():
3088 raise OSError('close')
3089 buffer.close = bad_close
3090 txt = self.TextIOWrapper(buffer, encoding="ascii")
3091 txt.flush = bad_flush
3092 with self.assertRaises(OSError) as err: # exception not swallowed
3093 txt.close()
3094 self.assertEqual(err.exception.args, ('close',))
3095 self.assertIsInstance(err.exception.__context__, OSError)
3096 self.assertEqual(err.exception.__context__.args, ('flush',))
3097 self.assertFalse(txt.closed)
3098
3099 def test_nonnormalized_close_error_on_close(self):
3100 # Issue #21677
3101 buffer = self.BytesIO(self.testdata)
3102 def bad_flush():
3103 raise non_existing_flush
3104 def bad_close():
3105 raise non_existing_close
3106 buffer.close = bad_close
3107 txt = self.TextIOWrapper(buffer, encoding="ascii")
3108 txt.flush = bad_flush
3109 with self.assertRaises(NameError) as err: # exception not swallowed
3110 txt.close()
3111 self.assertIn('non_existing_close', str(err.exception))
3112 self.assertIsInstance(err.exception.__context__, NameError)
3113 self.assertIn('non_existing_flush', str(err.exception.__context__))
3114 self.assertFalse(txt.closed)
3115
Antoine Pitrou6be88762010-05-03 16:48:20 +00003116 def test_multi_close(self):
3117 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3118 txt.close()
3119 txt.close()
3120 txt.close()
3121 self.assertRaises(ValueError, txt.flush)
3122
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003123 def test_unseekable(self):
3124 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3125 self.assertRaises(self.UnsupportedOperation, txt.tell)
3126 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3127
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003128 def test_readonly_attributes(self):
3129 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3130 buf = self.BytesIO(self.testdata)
3131 with self.assertRaises(AttributeError):
3132 txt.buffer = buf
3133
Antoine Pitroue96ec682011-07-23 21:46:35 +02003134 def test_rawio(self):
3135 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3136 # that subprocess.Popen() can have the required unbuffered
3137 # semantics with universal_newlines=True.
3138 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3139 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3140 # Reads
3141 self.assertEqual(txt.read(4), 'abcd')
3142 self.assertEqual(txt.readline(), 'efghi\n')
3143 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3144
3145 def test_rawio_write_through(self):
3146 # Issue #12591: with write_through=True, writes don't need a flush
3147 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3148 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3149 write_through=True)
3150 txt.write('1')
3151 txt.write('23\n4')
3152 txt.write('5')
3153 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3154
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003155 def test_bufio_write_through(self):
3156 # Issue #21396: write_through=True doesn't force a flush()
3157 # on the underlying binary buffered object.
3158 flush_called, write_called = [], []
3159 class BufferedWriter(self.BufferedWriter):
3160 def flush(self, *args, **kwargs):
3161 flush_called.append(True)
3162 return super().flush(*args, **kwargs)
3163 def write(self, *args, **kwargs):
3164 write_called.append(True)
3165 return super().write(*args, **kwargs)
3166
3167 rawio = self.BytesIO()
3168 data = b"a"
3169 bufio = BufferedWriter(rawio, len(data)*2)
3170 textio = self.TextIOWrapper(bufio, encoding='ascii',
3171 write_through=True)
3172 # write to the buffered io but don't overflow the buffer
3173 text = data.decode('ascii')
3174 textio.write(text)
3175
3176 # buffer.flush is not called with write_through=True
3177 self.assertFalse(flush_called)
3178 # buffer.write *is* called with write_through=True
3179 self.assertTrue(write_called)
3180 self.assertEqual(rawio.getvalue(), b"") # no flush
3181
3182 write_called = [] # reset
3183 textio.write(text * 10) # total content is larger than bufio buffer
3184 self.assertTrue(write_called)
3185 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3186
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003187 def test_reconfigure_write_through(self):
3188 raw = self.MockRawIO([])
3189 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3190 t.write('1')
3191 t.reconfigure(write_through=True) # implied flush
3192 self.assertEqual(t.write_through, True)
3193 self.assertEqual(b''.join(raw._write_stack), b'1')
3194 t.write('23')
3195 self.assertEqual(b''.join(raw._write_stack), b'123')
3196 t.reconfigure(write_through=False)
3197 self.assertEqual(t.write_through, False)
3198 t.write('45')
3199 t.flush()
3200 self.assertEqual(b''.join(raw._write_stack), b'12345')
3201 # Keeping default value
3202 t.reconfigure()
3203 t.reconfigure(write_through=None)
3204 self.assertEqual(t.write_through, False)
3205 t.reconfigure(write_through=True)
3206 t.reconfigure()
3207 t.reconfigure(write_through=None)
3208 self.assertEqual(t.write_through, True)
3209
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003210 def test_read_nonbytes(self):
3211 # Issue #17106
3212 # Crash when underlying read() returns non-bytes
3213 t = self.TextIOWrapper(self.StringIO('a'))
3214 self.assertRaises(TypeError, t.read, 1)
3215 t = self.TextIOWrapper(self.StringIO('a'))
3216 self.assertRaises(TypeError, t.readline)
3217 t = self.TextIOWrapper(self.StringIO('a'))
3218 self.assertRaises(TypeError, t.read)
3219
Oren Milmana5b4ea12017-08-25 21:14:54 +03003220 def test_illegal_encoder(self):
3221 # Issue 31271: Calling write() while the return value of encoder's
3222 # encode() is invalid shouldn't cause an assertion failure.
3223 rot13 = codecs.lookup("rot13")
3224 with support.swap_attr(rot13, '_is_text_encoding', True):
3225 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3226 self.assertRaises(TypeError, t.write, 'bar')
3227
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003228 def test_illegal_decoder(self):
3229 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003230 # Bypass the early encoding check added in issue 20404
3231 def _make_illegal_wrapper():
3232 quopri = codecs.lookup("quopri")
3233 quopri._is_text_encoding = True
3234 try:
3235 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3236 newline='\n', encoding="quopri")
3237 finally:
3238 quopri._is_text_encoding = False
3239 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003240 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003241 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003242 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003243 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003244 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003245 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003246 self.assertRaises(TypeError, t.read)
3247
Oren Milmanba7d7362017-08-29 11:58:27 +03003248 # Issue 31243: calling read() while the return value of decoder's
3249 # getstate() is invalid should neither crash the interpreter nor
3250 # raise a SystemError.
3251 def _make_very_illegal_wrapper(getstate_ret_val):
3252 class BadDecoder:
3253 def getstate(self):
3254 return getstate_ret_val
3255 def _get_bad_decoder(dummy):
3256 return BadDecoder()
3257 quopri = codecs.lookup("quopri")
3258 with support.swap_attr(quopri, 'incrementaldecoder',
3259 _get_bad_decoder):
3260 return _make_illegal_wrapper()
3261 t = _make_very_illegal_wrapper(42)
3262 self.assertRaises(TypeError, t.read, 42)
3263 t = _make_very_illegal_wrapper(())
3264 self.assertRaises(TypeError, t.read, 42)
3265 t = _make_very_illegal_wrapper((1, 2))
3266 self.assertRaises(TypeError, t.read, 42)
3267
Antoine Pitrou712cb732013-12-21 15:51:54 +01003268 def _check_create_at_shutdown(self, **kwargs):
3269 # Issue #20037: creating a TextIOWrapper at shutdown
3270 # shouldn't crash the interpreter.
3271 iomod = self.io.__name__
3272 code = """if 1:
3273 import codecs
3274 import {iomod} as io
3275
3276 # Avoid looking up codecs at shutdown
3277 codecs.lookup('utf-8')
3278
3279 class C:
3280 def __init__(self):
3281 self.buf = io.BytesIO()
3282 def __del__(self):
3283 io.TextIOWrapper(self.buf, **{kwargs})
3284 print("ok")
3285 c = C()
3286 """.format(iomod=iomod, kwargs=kwargs)
3287 return assert_python_ok("-c", code)
3288
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003289 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003290 def test_create_at_shutdown_without_encoding(self):
3291 rc, out, err = self._check_create_at_shutdown()
3292 if err:
3293 # Can error out with a RuntimeError if the module state
3294 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003295 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003296 else:
3297 self.assertEqual("ok", out.decode().strip())
3298
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003299 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003300 def test_create_at_shutdown_with_encoding(self):
3301 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3302 errors='strict')
3303 self.assertFalse(err)
3304 self.assertEqual("ok", out.decode().strip())
3305
Antoine Pitroub8503892014-04-29 10:14:02 +02003306 def test_read_byteslike(self):
3307 r = MemviewBytesIO(b'Just some random string\n')
3308 t = self.TextIOWrapper(r, 'utf-8')
3309
3310 # TextIOwrapper will not read the full string, because
3311 # we truncate it to a multiple of the native int size
3312 # so that we can construct a more complex memoryview.
3313 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3314
3315 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3316
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003317 def test_issue22849(self):
3318 class F(object):
3319 def readable(self): return True
3320 def writable(self): return True
3321 def seekable(self): return True
3322
3323 for i in range(10):
3324 try:
3325 self.TextIOWrapper(F(), encoding='utf-8')
3326 except Exception:
3327 pass
3328
3329 F.tell = lambda x: 0
3330 t = self.TextIOWrapper(F(), encoding='utf-8')
3331
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003332
Antoine Pitroub8503892014-04-29 10:14:02 +02003333class MemviewBytesIO(io.BytesIO):
3334 '''A BytesIO object whose read method returns memoryviews
3335 rather than bytes'''
3336
3337 def read1(self, len_):
3338 return _to_memoryview(super().read1(len_))
3339
3340 def read(self, len_):
3341 return _to_memoryview(super().read(len_))
3342
3343def _to_memoryview(buf):
3344 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3345
3346 arr = array.array('i')
3347 idx = len(buf) - len(buf) % arr.itemsize
3348 arr.frombytes(buf[:idx])
3349 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003350
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003351
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003352class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003353 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003354 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003355
3356 def test_initialization(self):
3357 r = self.BytesIO(b"\xc3\xa9\n\n")
3358 b = self.BufferedReader(r, 1000)
3359 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003360 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3361 self.assertRaises(ValueError, t.read)
3362
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003363 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3364 self.assertRaises(Exception, repr, t)
3365
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003366 def test_garbage_collection(self):
3367 # C TextIOWrapper objects are collected, and collecting them flushes
3368 # all data to disk.
3369 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003370 with support.check_warnings(('', ResourceWarning)):
3371 rawio = io.FileIO(support.TESTFN, "wb")
3372 b = self.BufferedWriter(rawio)
3373 t = self.TextIOWrapper(b, encoding="ascii")
3374 t.write("456def")
3375 t.x = t
3376 wr = weakref.ref(t)
3377 del t
3378 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003379 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003380 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003381 self.assertEqual(f.read(), b"456def")
3382
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003383 def test_rwpair_cleared_before_textio(self):
3384 # Issue 13070: TextIOWrapper's finalization would crash when called
3385 # after the reference to the underlying BufferedRWPair's writer got
3386 # cleared by the GC.
3387 for i in range(1000):
3388 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3389 t1 = self.TextIOWrapper(b1, encoding="ascii")
3390 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3391 t2 = self.TextIOWrapper(b2, encoding="ascii")
3392 # circular references
3393 t1.buddy = t2
3394 t2.buddy = t1
3395 support.gc_collect()
3396
3397
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003398class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003399 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003400 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003401
3402
3403class IncrementalNewlineDecoderTest(unittest.TestCase):
3404
3405 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003406 # UTF-8 specific tests for a newline decoder
3407 def _check_decode(b, s, **kwargs):
3408 # We exercise getstate() / setstate() as well as decode()
3409 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003410 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003411 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003412 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003413
Antoine Pitrou180a3362008-12-14 16:36:46 +00003414 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003415
Antoine Pitrou180a3362008-12-14 16:36:46 +00003416 _check_decode(b'\xe8', "")
3417 _check_decode(b'\xa2', "")
3418 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003419
Antoine Pitrou180a3362008-12-14 16:36:46 +00003420 _check_decode(b'\xe8', "")
3421 _check_decode(b'\xa2', "")
3422 _check_decode(b'\x88', "\u8888")
3423
3424 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003425 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3426
Antoine Pitrou180a3362008-12-14 16:36:46 +00003427 decoder.reset()
3428 _check_decode(b'\n', "\n")
3429 _check_decode(b'\r', "")
3430 _check_decode(b'', "\n", final=True)
3431 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003432
Antoine Pitrou180a3362008-12-14 16:36:46 +00003433 _check_decode(b'\r', "")
3434 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003435
Antoine Pitrou180a3362008-12-14 16:36:46 +00003436 _check_decode(b'\r\r\n', "\n\n")
3437 _check_decode(b'\r', "")
3438 _check_decode(b'\r', "\n")
3439 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003440
Antoine Pitrou180a3362008-12-14 16:36:46 +00003441 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3442 _check_decode(b'\xe8\xa2\x88', "\u8888")
3443 _check_decode(b'\n', "\n")
3444 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3445 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003446
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003447 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003448 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003449 if encoding is not None:
3450 encoder = codecs.getincrementalencoder(encoding)()
3451 def _decode_bytewise(s):
3452 # Decode one byte at a time
3453 for b in encoder.encode(s):
3454 result.append(decoder.decode(bytes([b])))
3455 else:
3456 encoder = None
3457 def _decode_bytewise(s):
3458 # Decode one char at a time
3459 for c in s:
3460 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003461 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003462 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003463 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003464 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003465 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003466 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003467 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003468 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003469 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003470 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003471 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003472 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003473 input = "abc"
3474 if encoder is not None:
3475 encoder.reset()
3476 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003477 self.assertEqual(decoder.decode(input), "abc")
3478 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003479
3480 def test_newline_decoder(self):
3481 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003482 # None meaning the IncrementalNewlineDecoder takes unicode input
3483 # rather than bytes input
3484 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003485 'utf-16', 'utf-16-le', 'utf-16-be',
3486 'utf-32', 'utf-32-le', 'utf-32-be',
3487 )
3488 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003489 decoder = enc and codecs.getincrementaldecoder(enc)()
3490 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3491 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003492 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003493 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3494 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003495 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003496
Antoine Pitrou66913e22009-03-06 23:40:56 +00003497 def test_newline_bytes(self):
3498 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3499 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003500 self.assertEqual(dec.newlines, None)
3501 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3502 self.assertEqual(dec.newlines, None)
3503 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3504 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003505 dec = self.IncrementalNewlineDecoder(None, translate=False)
3506 _check(dec)
3507 dec = self.IncrementalNewlineDecoder(None, translate=True)
3508 _check(dec)
3509
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003510class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3511 pass
3512
3513class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3514 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003515
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003516
Guido van Rossum01a27522007-03-07 01:00:12 +00003517# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003518
Guido van Rossum5abbf752007-08-27 17:39:33 +00003519class MiscIOTest(unittest.TestCase):
3520
Barry Warsaw40e82462008-11-20 20:14:50 +00003521 def tearDown(self):
3522 support.unlink(support.TESTFN)
3523
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003524 def test___all__(self):
3525 for name in self.io.__all__:
3526 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003527 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003528 if name == "open":
3529 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003530 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003531 self.assertTrue(issubclass(obj, Exception), name)
3532 elif not name.startswith("SEEK_"):
3533 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003534
Barry Warsaw40e82462008-11-20 20:14:50 +00003535 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003536 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003537 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003538 f.close()
3539
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003540 with support.check_warnings(('', DeprecationWarning)):
3541 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003542 self.assertEqual(f.name, support.TESTFN)
3543 self.assertEqual(f.buffer.name, support.TESTFN)
3544 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3545 self.assertEqual(f.mode, "U")
3546 self.assertEqual(f.buffer.mode, "rb")
3547 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003548 f.close()
3549
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003550 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003551 self.assertEqual(f.mode, "w+")
3552 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3553 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003554
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003555 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003556 self.assertEqual(g.mode, "wb")
3557 self.assertEqual(g.raw.mode, "wb")
3558 self.assertEqual(g.name, f.fileno())
3559 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003560 f.close()
3561 g.close()
3562
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003563 def test_io_after_close(self):
3564 for kwargs in [
3565 {"mode": "w"},
3566 {"mode": "wb"},
3567 {"mode": "w", "buffering": 1},
3568 {"mode": "w", "buffering": 2},
3569 {"mode": "wb", "buffering": 0},
3570 {"mode": "r"},
3571 {"mode": "rb"},
3572 {"mode": "r", "buffering": 1},
3573 {"mode": "r", "buffering": 2},
3574 {"mode": "rb", "buffering": 0},
3575 {"mode": "w+"},
3576 {"mode": "w+b"},
3577 {"mode": "w+", "buffering": 1},
3578 {"mode": "w+", "buffering": 2},
3579 {"mode": "w+b", "buffering": 0},
3580 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003581 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003582 f.close()
3583 self.assertRaises(ValueError, f.flush)
3584 self.assertRaises(ValueError, f.fileno)
3585 self.assertRaises(ValueError, f.isatty)
3586 self.assertRaises(ValueError, f.__iter__)
3587 if hasattr(f, "peek"):
3588 self.assertRaises(ValueError, f.peek, 1)
3589 self.assertRaises(ValueError, f.read)
3590 if hasattr(f, "read1"):
3591 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003592 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003593 if hasattr(f, "readall"):
3594 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003595 if hasattr(f, "readinto"):
3596 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003597 if hasattr(f, "readinto1"):
3598 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003599 self.assertRaises(ValueError, f.readline)
3600 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003601 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003602 self.assertRaises(ValueError, f.seek, 0)
3603 self.assertRaises(ValueError, f.tell)
3604 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003605 self.assertRaises(ValueError, f.write,
3606 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003607 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003608 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003609
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003610 def test_blockingioerror(self):
3611 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003612 class C(str):
3613 pass
3614 c = C("")
3615 b = self.BlockingIOError(1, c)
3616 c.b = b
3617 b.c = c
3618 wr = weakref.ref(c)
3619 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003620 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003621 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003622
3623 def test_abcs(self):
3624 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003625 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3626 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3627 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3628 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003629
3630 def _check_abc_inheritance(self, abcmodule):
3631 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003632 self.assertIsInstance(f, abcmodule.IOBase)
3633 self.assertIsInstance(f, abcmodule.RawIOBase)
3634 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3635 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003636 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003637 self.assertIsInstance(f, abcmodule.IOBase)
3638 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3639 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3640 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003641 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003642 self.assertIsInstance(f, abcmodule.IOBase)
3643 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3644 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3645 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003646
3647 def test_abc_inheritance(self):
3648 # Test implementations inherit from their respective ABCs
3649 self._check_abc_inheritance(self)
3650
3651 def test_abc_inheritance_official(self):
3652 # Test implementations inherit from the official ABCs of the
3653 # baseline "io" module.
3654 self._check_abc_inheritance(io)
3655
Antoine Pitroue033e062010-10-29 10:38:18 +00003656 def _check_warn_on_dealloc(self, *args, **kwargs):
3657 f = open(*args, **kwargs)
3658 r = repr(f)
3659 with self.assertWarns(ResourceWarning) as cm:
3660 f = None
3661 support.gc_collect()
3662 self.assertIn(r, str(cm.warning.args[0]))
3663
3664 def test_warn_on_dealloc(self):
3665 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3666 self._check_warn_on_dealloc(support.TESTFN, "wb")
3667 self._check_warn_on_dealloc(support.TESTFN, "w")
3668
3669 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3670 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003671 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003672 for fd in fds:
3673 try:
3674 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003675 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003676 if e.errno != errno.EBADF:
3677 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003678 self.addCleanup(cleanup_fds)
3679 r, w = os.pipe()
3680 fds += r, w
3681 self._check_warn_on_dealloc(r, *args, **kwargs)
3682 # When using closefd=False, there's no warning
3683 r, w = os.pipe()
3684 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02003685 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00003686 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00003687
3688 def test_warn_on_dealloc_fd(self):
3689 self._check_warn_on_dealloc_fd("rb", buffering=0)
3690 self._check_warn_on_dealloc_fd("rb")
3691 self._check_warn_on_dealloc_fd("r")
3692
3693
Antoine Pitrou243757e2010-11-05 21:15:39 +00003694 def test_pickling(self):
3695 # Pickling file objects is forbidden
3696 for kwargs in [
3697 {"mode": "w"},
3698 {"mode": "wb"},
3699 {"mode": "wb", "buffering": 0},
3700 {"mode": "r"},
3701 {"mode": "rb"},
3702 {"mode": "rb", "buffering": 0},
3703 {"mode": "w+"},
3704 {"mode": "w+b"},
3705 {"mode": "w+b", "buffering": 0},
3706 ]:
3707 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3708 with self.open(support.TESTFN, **kwargs) as f:
3709 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3710
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003711 def test_nonblock_pipe_write_bigbuf(self):
3712 self._test_nonblock_pipe_write(16*1024)
3713
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003714 def test_nonblock_pipe_write_smallbuf(self):
3715 self._test_nonblock_pipe_write(1024)
3716
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003717 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3718 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003719 def _test_nonblock_pipe_write(self, bufsize):
3720 sent = []
3721 received = []
3722 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003723 os.set_blocking(r, False)
3724 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003725
3726 # To exercise all code paths in the C implementation we need
3727 # to play with buffer sizes. For instance, if we choose a
3728 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3729 # then we will never get a partial write of the buffer.
3730 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3731 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3732
3733 with rf, wf:
3734 for N in 9999, 73, 7574:
3735 try:
3736 i = 0
3737 while True:
3738 msg = bytes([i % 26 + 97]) * N
3739 sent.append(msg)
3740 wf.write(msg)
3741 i += 1
3742
3743 except self.BlockingIOError as e:
3744 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003745 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003746 sent[-1] = sent[-1][:e.characters_written]
3747 received.append(rf.read())
3748 msg = b'BLOCKED'
3749 wf.write(msg)
3750 sent.append(msg)
3751
3752 while True:
3753 try:
3754 wf.flush()
3755 break
3756 except self.BlockingIOError as e:
3757 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003758 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003759 self.assertEqual(e.characters_written, 0)
3760 received.append(rf.read())
3761
3762 received += iter(rf.read, None)
3763
3764 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003765 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003766 self.assertTrue(wf.closed)
3767 self.assertTrue(rf.closed)
3768
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003769 def test_create_fail(self):
3770 # 'x' mode fails if file is existing
3771 with self.open(support.TESTFN, 'w'):
3772 pass
3773 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3774
3775 def test_create_writes(self):
3776 # 'x' mode opens for writing
3777 with self.open(support.TESTFN, 'xb') as f:
3778 f.write(b"spam")
3779 with self.open(support.TESTFN, 'rb') as f:
3780 self.assertEqual(b"spam", f.read())
3781
Christian Heimes7b648752012-09-10 14:48:43 +02003782 def test_open_allargs(self):
3783 # there used to be a buffer overflow in the parser for rawmode
3784 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3785
3786
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003787class CMiscIOTest(MiscIOTest):
3788 io = io
3789
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003790 def test_readinto_buffer_overflow(self):
3791 # Issue #18025
3792 class BadReader(self.io.BufferedIOBase):
3793 def read(self, n=-1):
3794 return b'x' * 10**6
3795 bufio = BadReader()
3796 b = bytearray(2)
3797 self.assertRaises(ValueError, bufio.readinto, b)
3798
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003799 def check_daemon_threads_shutdown_deadlock(self, stream_name):
3800 # Issue #23309: deadlocks at shutdown should be avoided when a
3801 # daemon thread and the main thread both write to a file.
3802 code = """if 1:
3803 import sys
3804 import time
3805 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02003806 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003807
3808 file = sys.{stream_name}
3809
3810 def run():
3811 while True:
3812 file.write('.')
3813 file.flush()
3814
Victor Stinner2a1aed02017-04-21 17:59:23 +02003815 crash = SuppressCrashReport()
3816 crash.__enter__()
3817 # don't call __exit__(): the crash occurs at Python shutdown
3818
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003819 thread = threading.Thread(target=run)
3820 thread.daemon = True
3821 thread.start()
3822
3823 time.sleep(0.5)
3824 file.write('!')
3825 file.flush()
3826 """.format_map(locals())
3827 res, _ = run_python_until_end("-c", code)
3828 err = res.err.decode()
3829 if res.rc != 0:
3830 # Failure: should be a fatal error
3831 self.assertIn("Fatal Python error: could not acquire lock "
3832 "for <_io.BufferedWriter name='<{stream_name}>'> "
3833 "at interpreter shutdown, possibly due to "
3834 "daemon threads".format_map(locals()),
3835 err)
3836 else:
3837 self.assertFalse(err.strip('.!'))
3838
3839 def test_daemon_threads_shutdown_stdout_deadlock(self):
3840 self.check_daemon_threads_shutdown_deadlock('stdout')
3841
3842 def test_daemon_threads_shutdown_stderr_deadlock(self):
3843 self.check_daemon_threads_shutdown_deadlock('stderr')
3844
3845
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003846class PyMiscIOTest(MiscIOTest):
3847 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003848
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003849
3850@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3851class SignalsTest(unittest.TestCase):
3852
3853 def setUp(self):
3854 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3855
3856 def tearDown(self):
3857 signal.signal(signal.SIGALRM, self.oldalrm)
3858
3859 def alarm_interrupt(self, sig, frame):
3860 1/0
3861
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003862 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3863 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003864 invokes the signal handler, and bubbles up the exception raised
3865 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003866 read_results = []
3867 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003868 if hasattr(signal, 'pthread_sigmask'):
3869 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003870 s = os.read(r, 1)
3871 read_results.append(s)
3872 t = threading.Thread(target=_read)
3873 t.daemon = True
3874 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003875 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01003876 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003877 try:
3878 wio = self.io.open(w, **fdopen_kwargs)
3879 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003880 # Fill the pipe enough that the write will be blocking.
3881 # It will be interrupted by the timer armed above. Since the
3882 # other thread has read one byte, the low-level write will
3883 # return with a successful (partial) result rather than an EINTR.
3884 # The buffered IO layer must check for pending signal
3885 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003886 signal.alarm(1)
3887 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01003888 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02003889 finally:
3890 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003891 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003892 # We got one byte, get another one and check that it isn't a
3893 # repeat of the first one.
3894 read_results.append(os.read(r, 1))
3895 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3896 finally:
3897 os.close(w)
3898 os.close(r)
3899 # This is deliberate. If we didn't close the file descriptor
3900 # before closing wio, wio would try to flush its internal
3901 # buffer, and block again.
3902 try:
3903 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003904 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003905 if e.errno != errno.EBADF:
3906 raise
3907
3908 def test_interrupted_write_unbuffered(self):
3909 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3910
3911 def test_interrupted_write_buffered(self):
3912 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3913
Victor Stinner6ab72862014-09-03 23:32:28 +02003914 # Issue #22331: The test hangs on FreeBSD 7.2
3915 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003916 def test_interrupted_write_text(self):
3917 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3918
Brett Cannon31f59292011-02-21 19:29:56 +00003919 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003920 def check_reentrant_write(self, data, **fdopen_kwargs):
3921 def on_alarm(*args):
3922 # Will be called reentrantly from the same thread
3923 wio.write(data)
3924 1/0
3925 signal.signal(signal.SIGALRM, on_alarm)
3926 r, w = os.pipe()
3927 wio = self.io.open(w, **fdopen_kwargs)
3928 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003929 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003930 # Either the reentrant call to wio.write() fails with RuntimeError,
3931 # or the signal handler raises ZeroDivisionError.
3932 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3933 while 1:
3934 for i in range(100):
3935 wio.write(data)
3936 wio.flush()
3937 # Make sure the buffer doesn't fill up and block further writes
3938 os.read(r, len(data) * 100)
3939 exc = cm.exception
3940 if isinstance(exc, RuntimeError):
3941 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3942 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07003943 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003944 wio.close()
3945 os.close(r)
3946
3947 def test_reentrant_write_buffered(self):
3948 self.check_reentrant_write(b"xy", mode="wb")
3949
3950 def test_reentrant_write_text(self):
3951 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3952
Antoine Pitrou707ce822011-02-25 21:24:11 +00003953 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3954 """Check that a buffered read, when it gets interrupted (either
3955 returning a partial result or EINTR), properly invokes the signal
3956 handler and retries if the latter returned successfully."""
3957 r, w = os.pipe()
3958 fdopen_kwargs["closefd"] = False
3959 def alarm_handler(sig, frame):
3960 os.write(w, b"bar")
3961 signal.signal(signal.SIGALRM, alarm_handler)
3962 try:
3963 rio = self.io.open(r, **fdopen_kwargs)
3964 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003965 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003966 # Expected behaviour:
3967 # - first raw read() returns partial b"foo"
3968 # - second raw read() returns EINTR
3969 # - third raw read() returns b"bar"
3970 self.assertEqual(decode(rio.read(6)), "foobar")
3971 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07003972 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003973 rio.close()
3974 os.close(w)
3975 os.close(r)
3976
Antoine Pitrou20db5112011-08-19 20:32:34 +02003977 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003978 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3979 mode="rb")
3980
Antoine Pitrou20db5112011-08-19 20:32:34 +02003981 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003982 self.check_interrupted_read_retry(lambda x: x,
3983 mode="r")
3984
Antoine Pitrou707ce822011-02-25 21:24:11 +00003985 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3986 """Check that a buffered write, when it gets interrupted (either
3987 returning a partial result or EINTR), properly invokes the signal
3988 handler and retries if the latter returned successfully."""
3989 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003990
Antoine Pitrou707ce822011-02-25 21:24:11 +00003991 # A quantity that exceeds the buffer size of an anonymous pipe's
3992 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003993 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003994 r, w = os.pipe()
3995 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003996
Antoine Pitrou707ce822011-02-25 21:24:11 +00003997 # We need a separate thread to read from the pipe and allow the
3998 # write() to finish. This thread is started after the SIGALRM is
3999 # received (forcing a first EINTR in write()).
4000 read_results = []
4001 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004002 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004003 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004004 try:
4005 while not write_finished:
4006 while r in select.select([r], [], [], 1.0)[0]:
4007 s = os.read(r, 1024)
4008 read_results.append(s)
4009 except BaseException as exc:
4010 nonlocal error
4011 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004012 t = threading.Thread(target=_read)
4013 t.daemon = True
4014 def alarm1(sig, frame):
4015 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004016 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004017 def alarm2(sig, frame):
4018 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004019
4020 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004021 signal.signal(signal.SIGALRM, alarm1)
4022 try:
4023 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004024 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004025 # Expected behaviour:
4026 # - first raw write() is partial (because of the limited pipe buffer
4027 # and the first alarm)
4028 # - second raw write() returns EINTR (because of the second alarm)
4029 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004030 written = wio.write(large_data)
4031 self.assertEqual(N, written)
4032
Antoine Pitrou707ce822011-02-25 21:24:11 +00004033 wio.flush()
4034 write_finished = True
4035 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004036
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004037 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004038 self.assertEqual(N, sum(len(x) for x in read_results))
4039 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004040 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004041 write_finished = True
4042 os.close(w)
4043 os.close(r)
4044 # This is deliberate. If we didn't close the file descriptor
4045 # before closing wio, wio would try to flush its internal
4046 # buffer, and could block (in case of failure).
4047 try:
4048 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004049 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004050 if e.errno != errno.EBADF:
4051 raise
4052
Antoine Pitrou20db5112011-08-19 20:32:34 +02004053 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004054 self.check_interrupted_write_retry(b"x", mode="wb")
4055
Antoine Pitrou20db5112011-08-19 20:32:34 +02004056 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004057 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4058
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004059
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004060class CSignalsTest(SignalsTest):
4061 io = io
4062
4063class PySignalsTest(SignalsTest):
4064 io = pyio
4065
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004066 # Handling reentrancy issues would slow down _pyio even more, so the
4067 # tests are disabled.
4068 test_reentrant_write_buffered = None
4069 test_reentrant_write_text = None
4070
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004071
Ezio Melottidaa42c72013-03-23 16:30:16 +02004072def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004073 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004074 CBufferedReaderTest, PyBufferedReaderTest,
4075 CBufferedWriterTest, PyBufferedWriterTest,
4076 CBufferedRWPairTest, PyBufferedRWPairTest,
4077 CBufferedRandomTest, PyBufferedRandomTest,
4078 StatefulIncrementalDecoderTest,
4079 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4080 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004081 CMiscIOTest, PyMiscIOTest,
4082 CSignalsTest, PySignalsTest,
4083 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004084
4085 # Put the namespaces of the IO module we are testing and some useful mock
4086 # classes in the __dict__ of each test.
4087 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00004088 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004089 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4090 c_io_ns = {name : getattr(io, name) for name in all_members}
4091 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4092 globs = globals()
4093 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4094 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4095 # Avoid turning open into a bound method.
4096 py_io_ns["open"] = pyio.OpenWrapper
4097 for test in tests:
4098 if test.__name__.startswith("C"):
4099 for name, obj in c_io_ns.items():
4100 setattr(test, name, obj)
4101 elif test.__name__.startswith("Py"):
4102 for name, obj in py_io_ns.items():
4103 setattr(test, name, obj)
4104
Ezio Melottidaa42c72013-03-23 16:30:16 +02004105 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4106 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004107
4108if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004109 unittest.main()