blob: 1dab166817b86e69987f8b6f7683ddc5e4273d5f [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Berker Peksagce643912015-05-06 06:33:17 +030038from test.support.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000048def _default_chunk_size():
49 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000050 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000051 return f._CHUNK_SIZE
52
53
Antoine Pitrou328ec742010-09-14 18:37:24 +000054class MockRawIOWithoutRead:
55 """A RawIO implementation without read(), so as to exercise the default
56 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000057
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000058 def __init__(self, read_stack=()):
59 self._read_stack = list(read_stack)
60 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000061 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000062 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000063
Guido van Rossum01a27522007-03-07 01:00:12 +000064 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000066 return len(b)
67
68 def writable(self):
69 return True
70
Guido van Rossum68bbcd22007-02-27 17:19:33 +000071 def fileno(self):
72 return 42
73
74 def readable(self):
75 return True
76
Guido van Rossum01a27522007-03-07 01:00:12 +000077 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000078 return True
79
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000082
83 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return 0 # same comment as above
85
86 def readinto(self, buf):
87 self._reads += 1
88 max_len = len(buf)
89 try:
90 data = self._read_stack[0]
91 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000092 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000093 return 0
94 if data is None:
95 del self._read_stack[0]
96 return None
97 n = len(data)
98 if len(data) <= max_len:
99 del self._read_stack[0]
100 buf[:n] = data
101 return n
102 else:
103 buf[:] = data[:max_len]
104 self._read_stack[0] = data[max_len:]
105 return max_len
106
107 def truncate(self, pos=None):
108 return pos
109
Antoine Pitrou328ec742010-09-14 18:37:24 +0000110class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
111 pass
112
113class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
114 pass
115
116
117class MockRawIO(MockRawIOWithoutRead):
118
119 def read(self, n=None):
120 self._reads += 1
121 try:
122 return self._read_stack.pop(0)
123 except:
124 self._extraneous_reads += 1
125 return b""
126
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000127class CMockRawIO(MockRawIO, io.RawIOBase):
128 pass
129
130class PyMockRawIO(MockRawIO, pyio.RawIOBase):
131 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000132
Guido van Rossuma9e20242007-03-08 00:43:48 +0000133
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000134class MisbehavedRawIO(MockRawIO):
135 def write(self, b):
136 return super().write(b) * 2
137
138 def read(self, n=None):
139 return super().read(n) * 2
140
141 def seek(self, pos, whence):
142 return -123
143
144 def tell(self):
145 return -456
146
147 def readinto(self, buf):
148 super().readinto(buf)
149 return len(buf) * 5
150
151class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
152 pass
153
154class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
155 pass
156
157
158class CloseFailureIO(MockRawIO):
159 closed = 0
160
161 def close(self):
162 if not self.closed:
163 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200164 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000165
166class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
167 pass
168
169class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
170 pass
171
172
173class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000174
175 def __init__(self, data):
176 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000177 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000181 self.read_history.append(None if res is None else len(res))
182 return res
183
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 def readinto(self, b):
185 res = super().readinto(b)
186 self.read_history.append(res)
187 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000189class CMockFileIO(MockFileIO, io.BytesIO):
190 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000192class PyMockFileIO(MockFileIO, pyio.BytesIO):
193 pass
194
195
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000196class MockUnseekableIO:
197 def seekable(self):
198 return False
199
200 def seek(self, *args):
201 raise self.UnsupportedOperation("not seekable")
202
203 def tell(self, *args):
204 raise self.UnsupportedOperation("not seekable")
205
Martin Panter754aab22016-03-31 07:21:56 +0000206 def truncate(self, *args):
207 raise self.UnsupportedOperation("not seekable")
208
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000209class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
210 UnsupportedOperation = io.UnsupportedOperation
211
212class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
213 UnsupportedOperation = pyio.UnsupportedOperation
214
215
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216class MockNonBlockWriterIO:
217
218 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000219 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000220 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000222 def pop_written(self):
223 s = b"".join(self._write_stack)
224 self._write_stack[:] = []
225 return s
226
227 def block_on(self, char):
228 """Block when a given char is encountered."""
229 self._blocker_char = char
230
231 def readable(self):
232 return True
233
234 def seekable(self):
235 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000236
Guido van Rossum01a27522007-03-07 01:00:12 +0000237 def writable(self):
238 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000239
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000240 def write(self, b):
241 b = bytes(b)
242 n = -1
243 if self._blocker_char:
244 try:
245 n = b.index(self._blocker_char)
246 except ValueError:
247 pass
248 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100249 if n > 0:
250 # write data up to the first blocker
251 self._write_stack.append(b[:n])
252 return n
253 else:
254 # cancel blocker and indicate would block
255 self._blocker_char = None
256 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000257 self._write_stack.append(b)
258 return len(b)
259
260class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
261 BlockingIOError = io.BlockingIOError
262
263class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
264 BlockingIOError = pyio.BlockingIOError
265
Guido van Rossuma9e20242007-03-08 00:43:48 +0000266
Guido van Rossum28524c72007-02-27 05:47:44 +0000267class IOTest(unittest.TestCase):
268
Neal Norwitze7789b12008-03-24 06:18:09 +0000269 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000270 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000271
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000272 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000273 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000274
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000276 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000277 f.truncate(0)
278 self.assertEqual(f.tell(), 5)
279 f.seek(0)
280
281 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000282 self.assertEqual(f.seek(0), 0)
283 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000284 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000285 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000286 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000287 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000288 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000289 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000290 self.assertEqual(f.seek(-1, 2), 13)
291 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000292
Guido van Rossum87429772007-04-10 21:06:59 +0000293 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000294 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000295 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000296
Guido van Rossum9b76da62007-04-11 01:09:03 +0000297 def read_ops(self, f, buffered=False):
298 data = f.read(5)
299 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000300 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000301 self.assertEqual(f.readinto(data), 5)
302 self.assertEqual(data, b" worl")
303 self.assertEqual(f.readinto(data), 2)
304 self.assertEqual(len(data), 5)
305 self.assertEqual(data[:2], b"d\n")
306 self.assertEqual(f.seek(0), 0)
307 self.assertEqual(f.read(20), b"hello world\n")
308 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000309 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000310 self.assertEqual(f.seek(-6, 2), 6)
311 self.assertEqual(f.read(5), b"world")
312 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000313 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000314 self.assertEqual(f.seek(-6, 1), 5)
315 self.assertEqual(f.read(5), b" worl")
316 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000317 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000318 if buffered:
319 f.seek(0)
320 self.assertEqual(f.read(), b"hello world\n")
321 f.seek(6)
322 self.assertEqual(f.read(), b"world\n")
323 self.assertEqual(f.read(), b"")
324
Guido van Rossum34d69e52007-04-10 20:08:41 +0000325 LARGE = 2**31
326
Guido van Rossum53807da2007-04-10 19:01:47 +0000327 def large_file_ops(self, f):
328 assert f.readable()
329 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000330 self.assertEqual(f.seek(self.LARGE), self.LARGE)
331 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000332 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000333 self.assertEqual(f.tell(), self.LARGE + 3)
334 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000336 self.assertEqual(f.tell(), self.LARGE + 2)
337 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000338 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000339 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000340 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
341 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000342 self.assertEqual(f.read(2), b"x")
343
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000344 def test_invalid_operations(self):
345 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000346 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000347 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000348 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000349 self.assertRaises(exc, fp.read)
350 self.assertRaises(exc, fp.readline)
351 with self.open(support.TESTFN, "wb", buffering=0) as fp:
352 self.assertRaises(exc, fp.read)
353 self.assertRaises(exc, fp.readline)
354 with self.open(support.TESTFN, "rb", buffering=0) as fp:
355 self.assertRaises(exc, fp.write, b"blah")
356 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000357 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000358 self.assertRaises(exc, fp.write, b"blah")
359 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000360 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000361 self.assertRaises(exc, fp.write, "blah")
362 self.assertRaises(exc, fp.writelines, ["blah\n"])
363 # Non-zero seeking from current or end pos
364 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
365 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000366
Martin Panter754aab22016-03-31 07:21:56 +0000367 def test_optional_abilities(self):
368 # Test for OSError when optional APIs are not supported
369 # The purpose of this test is to try fileno(), reading, writing and
370 # seeking operations with various objects that indicate they do not
371 # support these operations.
372
373 def pipe_reader():
374 [r, w] = os.pipe()
375 os.close(w) # So that read() is harmless
376 return self.FileIO(r, "r")
377
378 def pipe_writer():
379 [r, w] = os.pipe()
380 self.addCleanup(os.close, r)
381 # Guarantee that we can write into the pipe without blocking
382 thread = threading.Thread(target=os.read, args=(r, 100))
383 thread.start()
384 self.addCleanup(thread.join)
385 return self.FileIO(w, "w")
386
387 def buffered_reader():
388 return self.BufferedReader(self.MockUnseekableIO())
389
390 def buffered_writer():
391 return self.BufferedWriter(self.MockUnseekableIO())
392
393 def buffered_random():
394 return self.BufferedRandom(self.BytesIO())
395
396 def buffered_rw_pair():
397 return self.BufferedRWPair(self.MockUnseekableIO(),
398 self.MockUnseekableIO())
399
400 def text_reader():
401 class UnseekableReader(self.MockUnseekableIO):
402 writable = self.BufferedIOBase.writable
403 write = self.BufferedIOBase.write
404 return self.TextIOWrapper(UnseekableReader(), "ascii")
405
406 def text_writer():
407 class UnseekableWriter(self.MockUnseekableIO):
408 readable = self.BufferedIOBase.readable
409 read = self.BufferedIOBase.read
410 return self.TextIOWrapper(UnseekableWriter(), "ascii")
411
412 tests = (
413 (pipe_reader, "fr"), (pipe_writer, "fw"),
414 (buffered_reader, "r"), (buffered_writer, "w"),
415 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
416 (text_reader, "r"), (text_writer, "w"),
417 (self.BytesIO, "rws"), (self.StringIO, "rws"),
418 )
419 for [test, abilities] in tests:
420 if test is pipe_writer and not threading:
421 continue # Skip subtest that uses a background thread
422 with self.subTest(test), test() as obj:
423 readable = "r" in abilities
424 self.assertEqual(obj.readable(), readable)
425 writable = "w" in abilities
426 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000427
428 if isinstance(obj, self.TextIOBase):
429 data = "3"
430 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
431 data = b"3"
432 else:
433 self.fail("Unknown base class")
434
435 if "f" in abilities:
436 obj.fileno()
437 else:
438 self.assertRaises(OSError, obj.fileno)
439
440 if readable:
441 obj.read(1)
442 obj.read()
443 else:
444 self.assertRaises(OSError, obj.read, 1)
445 self.assertRaises(OSError, obj.read)
446
447 if writable:
448 obj.write(data)
449 else:
450 self.assertRaises(OSError, obj.write, data)
451
Martin Panter3ee147f2016-03-31 21:05:31 +0000452 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000453 pipe_reader, pipe_writer):
454 # Pipes seem to appear as seekable on Windows
455 continue
456 seekable = "s" in abilities
457 self.assertEqual(obj.seekable(), seekable)
458
Martin Panter754aab22016-03-31 07:21:56 +0000459 if seekable:
460 obj.tell()
461 obj.seek(0)
462 else:
463 self.assertRaises(OSError, obj.tell)
464 self.assertRaises(OSError, obj.seek, 0)
465
466 if writable and seekable:
467 obj.truncate()
468 obj.truncate(0)
469 else:
470 self.assertRaises(OSError, obj.truncate)
471 self.assertRaises(OSError, obj.truncate, 0)
472
Antoine Pitrou13348842012-01-29 18:36:34 +0100473 def test_open_handles_NUL_chars(self):
474 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300475 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100476
477 bytes_fn = bytes(fn_with_NUL, 'ascii')
478 with warnings.catch_warnings():
479 warnings.simplefilter("ignore", DeprecationWarning)
480 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100481
Guido van Rossum28524c72007-02-27 05:47:44 +0000482 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000483 with self.open(support.TESTFN, "wb", buffering=0) as f:
484 self.assertEqual(f.readable(), False)
485 self.assertEqual(f.writable(), True)
486 self.assertEqual(f.seekable(), True)
487 self.write_ops(f)
488 with self.open(support.TESTFN, "rb", buffering=0) as f:
489 self.assertEqual(f.readable(), True)
490 self.assertEqual(f.writable(), False)
491 self.assertEqual(f.seekable(), True)
492 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000493
Guido van Rossum87429772007-04-10 21:06:59 +0000494 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000495 with self.open(support.TESTFN, "wb") as f:
496 self.assertEqual(f.readable(), False)
497 self.assertEqual(f.writable(), True)
498 self.assertEqual(f.seekable(), True)
499 self.write_ops(f)
500 with self.open(support.TESTFN, "rb") as f:
501 self.assertEqual(f.readable(), True)
502 self.assertEqual(f.writable(), False)
503 self.assertEqual(f.seekable(), True)
504 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000505
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000506 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000507 with self.open(support.TESTFN, "wb") as f:
508 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
509 with self.open(support.TESTFN, "rb") as f:
510 self.assertEqual(f.readline(), b"abc\n")
511 self.assertEqual(f.readline(10), b"def\n")
512 self.assertEqual(f.readline(2), b"xy")
513 self.assertEqual(f.readline(4), b"zzy\n")
514 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000515 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000516 self.assertRaises(TypeError, f.readline, 5.3)
517 with self.open(support.TESTFN, "r") as f:
518 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000519
Guido van Rossum28524c72007-02-27 05:47:44 +0000520 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000521 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000522 self.write_ops(f)
523 data = f.getvalue()
524 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000525 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000526 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000527
Guido van Rossum53807da2007-04-10 19:01:47 +0000528 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000529 # On Windows and Mac OSX this test comsumes large resources; It takes
530 # a long time to build the >2GB file and takes >2GB of disk space
531 # therefore the resource must be enabled to run this test.
532 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600533 support.requires(
534 'largefile',
535 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000536 with self.open(support.TESTFN, "w+b", 0) as f:
537 self.large_file_ops(f)
538 with self.open(support.TESTFN, "w+b") as f:
539 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000540
541 def test_with_open(self):
542 for bufsize in (0, 1, 100):
543 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000544 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000545 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000546 self.assertEqual(f.closed, True)
547 f = None
548 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000549 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000550 1/0
551 except ZeroDivisionError:
552 self.assertEqual(f.closed, True)
553 else:
554 self.fail("1/0 didn't raise an exception")
555
Antoine Pitrou08838b62009-01-21 00:55:13 +0000556 # issue 5008
557 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000558 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000559 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000560 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000561 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000563 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000564 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300565 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000566
Guido van Rossum87429772007-04-10 21:06:59 +0000567 def test_destructor(self):
568 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000569 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000570 def __del__(self):
571 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000572 try:
573 f = super().__del__
574 except AttributeError:
575 pass
576 else:
577 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000578 def close(self):
579 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000580 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000581 def flush(self):
582 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000583 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000584 with support.check_warnings(('', ResourceWarning)):
585 f = MyFileIO(support.TESTFN, "wb")
586 f.write(b"xxx")
587 del f
588 support.gc_collect()
589 self.assertEqual(record, [1, 2, 3])
590 with self.open(support.TESTFN, "rb") as f:
591 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000592
593 def _check_base_destructor(self, base):
594 record = []
595 class MyIO(base):
596 def __init__(self):
597 # This exercises the availability of attributes on object
598 # destruction.
599 # (in the C version, close() is called by the tp_dealloc
600 # function, not by __del__)
601 self.on_del = 1
602 self.on_close = 2
603 self.on_flush = 3
604 def __del__(self):
605 record.append(self.on_del)
606 try:
607 f = super().__del__
608 except AttributeError:
609 pass
610 else:
611 f()
612 def close(self):
613 record.append(self.on_close)
614 super().close()
615 def flush(self):
616 record.append(self.on_flush)
617 super().flush()
618 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000619 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000620 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000621 self.assertEqual(record, [1, 2, 3])
622
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000623 def test_IOBase_destructor(self):
624 self._check_base_destructor(self.IOBase)
625
626 def test_RawIOBase_destructor(self):
627 self._check_base_destructor(self.RawIOBase)
628
629 def test_BufferedIOBase_destructor(self):
630 self._check_base_destructor(self.BufferedIOBase)
631
632 def test_TextIOBase_destructor(self):
633 self._check_base_destructor(self.TextIOBase)
634
Guido van Rossum87429772007-04-10 21:06:59 +0000635 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000636 with self.open(support.TESTFN, "wb") as f:
637 f.write(b"xxx")
638 with self.open(support.TESTFN, "rb") as f:
639 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000640
Guido van Rossumd4103952007-04-12 05:44:49 +0000641 def test_array_writes(self):
642 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000643 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000644 with self.open(support.TESTFN, "wb", 0) as f:
645 self.assertEqual(f.write(a), n)
646 with self.open(support.TESTFN, "wb") as f:
647 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000648
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000649 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000650 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000651 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000652
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000653 def test_read_closed(self):
654 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000655 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000656 with self.open(support.TESTFN, "r") as f:
657 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000658 self.assertEqual(file.read(), "egg\n")
659 file.seek(0)
660 file.close()
661 self.assertRaises(ValueError, file.read)
662
663 def test_no_closefd_with_filename(self):
664 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000665 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000666
667 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000668 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000669 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000670 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000671 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000672 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000673 self.assertEqual(file.buffer.raw.closefd, False)
674
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000675 def test_garbage_collection(self):
676 # FileIO objects are collected, and collecting them flushes
677 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000678 with support.check_warnings(('', ResourceWarning)):
679 f = self.FileIO(support.TESTFN, "wb")
680 f.write(b"abcxxx")
681 f.f = f
682 wr = weakref.ref(f)
683 del f
684 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300685 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000686 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000687 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000688
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000689 def test_unbounded_file(self):
690 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
691 zero = "/dev/zero"
692 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000693 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000694 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000695 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000696 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000697 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000698 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000699 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000700 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000701 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000702 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000703 self.assertRaises(OverflowError, f.read)
704
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200705 def check_flush_error_on_close(self, *args, **kwargs):
706 # Test that the file is closed despite failed flush
707 # and that flush() is called before file closed.
708 f = self.open(*args, **kwargs)
709 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000710 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200711 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200712 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000713 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200714 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600715 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200716 self.assertTrue(closed) # flush() called
717 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200718 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200719
720 def test_flush_error_on_close(self):
721 # raw file
722 # Issue #5700: io.FileIO calls flush() after file closed
723 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
724 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
725 self.check_flush_error_on_close(fd, 'wb', buffering=0)
726 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
727 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
728 os.close(fd)
729 # buffered io
730 self.check_flush_error_on_close(support.TESTFN, 'wb')
731 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
732 self.check_flush_error_on_close(fd, 'wb')
733 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
734 self.check_flush_error_on_close(fd, 'wb', closefd=False)
735 os.close(fd)
736 # text io
737 self.check_flush_error_on_close(support.TESTFN, 'w')
738 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
739 self.check_flush_error_on_close(fd, 'w')
740 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
741 self.check_flush_error_on_close(fd, 'w', closefd=False)
742 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000743
744 def test_multi_close(self):
745 f = self.open(support.TESTFN, "wb", buffering=0)
746 f.close()
747 f.close()
748 f.close()
749 self.assertRaises(ValueError, f.flush)
750
Antoine Pitrou328ec742010-09-14 18:37:24 +0000751 def test_RawIOBase_read(self):
752 # Exercise the default RawIOBase.read() implementation (which calls
753 # readinto() internally).
754 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
755 self.assertEqual(rawio.read(2), b"ab")
756 self.assertEqual(rawio.read(2), b"c")
757 self.assertEqual(rawio.read(2), b"d")
758 self.assertEqual(rawio.read(2), None)
759 self.assertEqual(rawio.read(2), b"ef")
760 self.assertEqual(rawio.read(2), b"g")
761 self.assertEqual(rawio.read(2), None)
762 self.assertEqual(rawio.read(2), b"")
763
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400764 def test_types_have_dict(self):
765 test = (
766 self.IOBase(),
767 self.RawIOBase(),
768 self.TextIOBase(),
769 self.StringIO(),
770 self.BytesIO()
771 )
772 for obj in test:
773 self.assertTrue(hasattr(obj, "__dict__"))
774
Ross Lagerwall59142db2011-10-31 20:34:46 +0200775 def test_opener(self):
776 with self.open(support.TESTFN, "w") as f:
777 f.write("egg\n")
778 fd = os.open(support.TESTFN, os.O_RDONLY)
779 def opener(path, flags):
780 return fd
781 with self.open("non-existent", "r", opener=opener) as f:
782 self.assertEqual(f.read(), "egg\n")
783
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200784 def test_fileio_closefd(self):
785 # Issue #4841
786 with self.open(__file__, 'rb') as f1, \
787 self.open(__file__, 'rb') as f2:
788 fileio = self.FileIO(f1.fileno(), closefd=False)
789 # .__init__() must not close f1
790 fileio.__init__(f2.fileno(), closefd=False)
791 f1.readline()
792 # .close() must not close f2
793 fileio.close()
794 f2.readline()
795
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300796 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200797 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300798 with self.assertRaises(ValueError):
799 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300800
801 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200802 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300803 with self.assertRaises(ValueError):
804 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300805
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200806
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000807class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200808
809 def test_IOBase_finalize(self):
810 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
811 # class which inherits IOBase and an object of this class are caught
812 # in a reference cycle and close() is already in the method cache.
813 class MyIO(self.IOBase):
814 def close(self):
815 pass
816
817 # create an instance to populate the method cache
818 MyIO()
819 obj = MyIO()
820 obj.obj = obj
821 wr = weakref.ref(obj)
822 del MyIO
823 del obj
824 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300825 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000826
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000827class PyIOTest(IOTest):
828 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000829
Guido van Rossuma9e20242007-03-08 00:43:48 +0000830
Gregory P. Smith1bef9072015-04-14 13:24:34 -0700831@support.cpython_only
832class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -0700833
Gregory P. Smith054b0652015-04-14 12:58:05 -0700834 def test_RawIOBase_io_in_pyio_match(self):
835 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +0200836 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
837 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -0700838 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
839
840 def test_RawIOBase_pyio_in_io_match(self):
841 """Test that c RawIOBase class has all pyio RawIOBase methods"""
842 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
843 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
844
845
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000846class CommonBufferedTests:
847 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
848
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000849 def test_detach(self):
850 raw = self.MockRawIO()
851 buf = self.tp(raw)
852 self.assertIs(buf.detach(), raw)
853 self.assertRaises(ValueError, buf.detach)
854
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600855 repr(buf) # Should still work
856
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000857 def test_fileno(self):
858 rawio = self.MockRawIO()
859 bufio = self.tp(rawio)
860
Ezio Melottib3aedd42010-11-20 19:04:17 +0000861 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000862
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000863 def test_invalid_args(self):
864 rawio = self.MockRawIO()
865 bufio = self.tp(rawio)
866 # Invalid whence
867 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200868 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000869
870 def test_override_destructor(self):
871 tp = self.tp
872 record = []
873 class MyBufferedIO(tp):
874 def __del__(self):
875 record.append(1)
876 try:
877 f = super().__del__
878 except AttributeError:
879 pass
880 else:
881 f()
882 def close(self):
883 record.append(2)
884 super().close()
885 def flush(self):
886 record.append(3)
887 super().flush()
888 rawio = self.MockRawIO()
889 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000890 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000891 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +0000892 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000893
894 def test_context_manager(self):
895 # Test usability as a context manager
896 rawio = self.MockRawIO()
897 bufio = self.tp(rawio)
898 def _with():
899 with bufio:
900 pass
901 _with()
902 # bufio should now be closed, and using it a second time should raise
903 # a ValueError.
904 self.assertRaises(ValueError, _with)
905
906 def test_error_through_destructor(self):
907 # Test that the exception state is not modified by a destructor,
908 # even if close() fails.
909 rawio = self.CloseFailureIO()
910 def f():
911 self.tp(rawio).xyzzy
912 with support.captured_output("stderr") as s:
913 self.assertRaises(AttributeError, f)
914 s = s.getvalue().strip()
915 if s:
916 # The destructor *may* have printed an unraisable error, check it
917 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200918 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000919 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000920
Antoine Pitrou716c4442009-05-23 19:04:03 +0000921 def test_repr(self):
922 raw = self.MockRawIO()
923 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +0300924 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +0000925 self.assertEqual(repr(b), "<%s>" % clsname)
926 raw.name = "dummy"
927 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
928 raw.name = b"dummy"
929 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
930
Antoine Pitrou6be88762010-05-03 16:48:20 +0000931 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200932 # Test that buffered file is closed despite failed flush
933 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +0000934 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200935 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000936 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200937 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200938 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000939 raw.flush = bad_flush
940 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200941 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600942 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200943 self.assertTrue(raw.closed)
944 self.assertTrue(closed) # flush() called
945 self.assertFalse(closed[0]) # flush() called before file closed
946 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200947 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -0600948
949 def test_close_error_on_close(self):
950 raw = self.MockRawIO()
951 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200952 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600953 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200954 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600955 raw.close = bad_close
956 b = self.tp(raw)
957 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200958 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600959 b.close()
960 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300961 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -0600962 self.assertEqual(err.exception.__context__.args, ('flush',))
963 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000964
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300965 def test_nonnormalized_close_error_on_close(self):
966 # Issue #21677
967 raw = self.MockRawIO()
968 def bad_flush():
969 raise non_existing_flush
970 def bad_close():
971 raise non_existing_close
972 raw.close = bad_close
973 b = self.tp(raw)
974 b.flush = bad_flush
975 with self.assertRaises(NameError) as err: # exception not swallowed
976 b.close()
977 self.assertIn('non_existing_close', str(err.exception))
978 self.assertIsInstance(err.exception.__context__, NameError)
979 self.assertIn('non_existing_flush', str(err.exception.__context__))
980 self.assertFalse(b.closed)
981
Antoine Pitrou6be88762010-05-03 16:48:20 +0000982 def test_multi_close(self):
983 raw = self.MockRawIO()
984 b = self.tp(raw)
985 b.close()
986 b.close()
987 b.close()
988 self.assertRaises(ValueError, b.flush)
989
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000990 def test_unseekable(self):
991 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
992 self.assertRaises(self.UnsupportedOperation, bufio.tell)
993 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
994
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000995 def test_readonly_attributes(self):
996 raw = self.MockRawIO()
997 buf = self.tp(raw)
998 x = self.MockRawIO()
999 with self.assertRaises(AttributeError):
1000 buf.raw = x
1001
Guido van Rossum78892e42007-04-06 17:31:18 +00001002
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001003class SizeofTest:
1004
1005 @support.cpython_only
1006 def test_sizeof(self):
1007 bufsize1 = 4096
1008 bufsize2 = 8192
1009 rawio = self.MockRawIO()
1010 bufio = self.tp(rawio, buffer_size=bufsize1)
1011 size = sys.getsizeof(bufio) - bufsize1
1012 rawio = self.MockRawIO()
1013 bufio = self.tp(rawio, buffer_size=bufsize2)
1014 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1015
Jesus Ceadc469452012-10-04 12:37:56 +02001016 @support.cpython_only
1017 def test_buffer_freeing(self) :
1018 bufsize = 4096
1019 rawio = self.MockRawIO()
1020 bufio = self.tp(rawio, buffer_size=bufsize)
1021 size = sys.getsizeof(bufio) - bufsize
1022 bufio.close()
1023 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001024
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001025class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1026 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001027
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001028 def test_constructor(self):
1029 rawio = self.MockRawIO([b"abc"])
1030 bufio = self.tp(rawio)
1031 bufio.__init__(rawio)
1032 bufio.__init__(rawio, buffer_size=1024)
1033 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001034 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001035 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1036 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1037 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1038 rawio = self.MockRawIO([b"abc"])
1039 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001040 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001041
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001042 def test_uninitialized(self):
1043 bufio = self.tp.__new__(self.tp)
1044 del bufio
1045 bufio = self.tp.__new__(self.tp)
1046 self.assertRaisesRegex((ValueError, AttributeError),
1047 'uninitialized|has no attribute',
1048 bufio.read, 0)
1049 bufio.__init__(self.MockRawIO())
1050 self.assertEqual(bufio.read(0), b'')
1051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001052 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001053 for arg in (None, 7):
1054 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1055 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001056 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001057 # Invalid args
1058 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001059
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001060 def test_read1(self):
1061 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1062 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001063 self.assertEqual(b"a", bufio.read(1))
1064 self.assertEqual(b"b", bufio.read1(1))
1065 self.assertEqual(rawio._reads, 1)
1066 self.assertEqual(b"c", bufio.read1(100))
1067 self.assertEqual(rawio._reads, 1)
1068 self.assertEqual(b"d", bufio.read1(100))
1069 self.assertEqual(rawio._reads, 2)
1070 self.assertEqual(b"efg", bufio.read1(100))
1071 self.assertEqual(rawio._reads, 3)
1072 self.assertEqual(b"", bufio.read1(100))
1073 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001074 # Invalid args
1075 self.assertRaises(ValueError, bufio.read1, -1)
1076
1077 def test_readinto(self):
1078 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1079 bufio = self.tp(rawio)
1080 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001081 self.assertEqual(bufio.readinto(b), 2)
1082 self.assertEqual(b, b"ab")
1083 self.assertEqual(bufio.readinto(b), 2)
1084 self.assertEqual(b, b"cd")
1085 self.assertEqual(bufio.readinto(b), 2)
1086 self.assertEqual(b, b"ef")
1087 self.assertEqual(bufio.readinto(b), 1)
1088 self.assertEqual(b, b"gf")
1089 self.assertEqual(bufio.readinto(b), 0)
1090 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001091 rawio = self.MockRawIO((b"abc", None))
1092 bufio = self.tp(rawio)
1093 self.assertEqual(bufio.readinto(b), 2)
1094 self.assertEqual(b, b"ab")
1095 self.assertEqual(bufio.readinto(b), 1)
1096 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001097
Benjamin Petersona96fea02014-06-22 14:17:44 -07001098 def test_readinto1(self):
1099 buffer_size = 10
1100 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1101 bufio = self.tp(rawio, buffer_size=buffer_size)
1102 b = bytearray(2)
1103 self.assertEqual(bufio.peek(3), b'abc')
1104 self.assertEqual(rawio._reads, 1)
1105 self.assertEqual(bufio.readinto1(b), 2)
1106 self.assertEqual(b, b"ab")
1107 self.assertEqual(rawio._reads, 1)
1108 self.assertEqual(bufio.readinto1(b), 1)
1109 self.assertEqual(b[:1], b"c")
1110 self.assertEqual(rawio._reads, 1)
1111 self.assertEqual(bufio.readinto1(b), 2)
1112 self.assertEqual(b, b"de")
1113 self.assertEqual(rawio._reads, 2)
1114 b = bytearray(2*buffer_size)
1115 self.assertEqual(bufio.peek(3), b'fgh')
1116 self.assertEqual(rawio._reads, 3)
1117 self.assertEqual(bufio.readinto1(b), 6)
1118 self.assertEqual(b[:6], b"fghjkl")
1119 self.assertEqual(rawio._reads, 4)
1120
1121 def test_readinto_array(self):
1122 buffer_size = 60
1123 data = b"a" * 26
1124 rawio = self.MockRawIO((data,))
1125 bufio = self.tp(rawio, buffer_size=buffer_size)
1126
1127 # Create an array with element size > 1 byte
1128 b = array.array('i', b'x' * 32)
1129 assert len(b) != 16
1130
1131 # Read into it. We should get as many *bytes* as we can fit into b
1132 # (which is more than the number of elements)
1133 n = bufio.readinto(b)
1134 self.assertGreater(n, len(b))
1135
1136 # Check that old contents of b are preserved
1137 bm = memoryview(b).cast('B')
1138 self.assertLess(n, len(bm))
1139 self.assertEqual(bm[:n], data[:n])
1140 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1141
1142 def test_readinto1_array(self):
1143 buffer_size = 60
1144 data = b"a" * 26
1145 rawio = self.MockRawIO((data,))
1146 bufio = self.tp(rawio, buffer_size=buffer_size)
1147
1148 # Create an array with element size > 1 byte
1149 b = array.array('i', b'x' * 32)
1150 assert len(b) != 16
1151
1152 # Read into it. We should get as many *bytes* as we can fit into b
1153 # (which is more than the number of elements)
1154 n = bufio.readinto1(b)
1155 self.assertGreater(n, len(b))
1156
1157 # Check that old contents of b are preserved
1158 bm = memoryview(b).cast('B')
1159 self.assertLess(n, len(bm))
1160 self.assertEqual(bm[:n], data[:n])
1161 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1162
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001163 def test_readlines(self):
1164 def bufio():
1165 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1166 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001167 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1168 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1169 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001170
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001171 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001172 data = b"abcdefghi"
1173 dlen = len(data)
1174
1175 tests = [
1176 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1177 [ 100, [ 3, 3, 3], [ dlen ] ],
1178 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1179 ]
1180
1181 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001182 rawio = self.MockFileIO(data)
1183 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001184 pos = 0
1185 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001186 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001187 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001188 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001189 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001191 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001192 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001193 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1194 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001195 self.assertEqual(b"abcd", bufio.read(6))
1196 self.assertEqual(b"e", bufio.read(1))
1197 self.assertEqual(b"fg", bufio.read())
1198 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001199 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001200 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001201
Victor Stinnera80987f2011-05-25 22:47:16 +02001202 rawio = self.MockRawIO((b"a", None, None))
1203 self.assertEqual(b"a", rawio.readall())
1204 self.assertIsNone(rawio.readall())
1205
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001206 def test_read_past_eof(self):
1207 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1208 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001209
Ezio Melottib3aedd42010-11-20 19:04:17 +00001210 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001211
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001212 def test_read_all(self):
1213 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1214 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001215
Ezio Melottib3aedd42010-11-20 19:04:17 +00001216 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001217
Victor Stinner45df8202010-04-28 22:31:17 +00001218 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001219 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001220 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001221 try:
1222 # Write out many bytes with exactly the same number of 0's,
1223 # 1's... 255's. This will help us check that concurrent reading
1224 # doesn't duplicate or forget contents.
1225 N = 1000
1226 l = list(range(256)) * N
1227 random.shuffle(l)
1228 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001229 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001230 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001231 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001232 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001233 errors = []
1234 results = []
1235 def f():
1236 try:
1237 # Intra-buffer read then buffer-flushing read
1238 for n in cycle([1, 19]):
1239 s = bufio.read(n)
1240 if not s:
1241 break
1242 # list.append() is atomic
1243 results.append(s)
1244 except Exception as e:
1245 errors.append(e)
1246 raise
1247 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001248 with support.start_threads(threads):
1249 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001250 self.assertFalse(errors,
1251 "the following exceptions were caught: %r" % errors)
1252 s = b''.join(results)
1253 for i in range(256):
1254 c = bytes(bytearray([i]))
1255 self.assertEqual(s.count(c), N)
1256 finally:
1257 support.unlink(support.TESTFN)
1258
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001259 def test_unseekable(self):
1260 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1261 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1262 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1263 bufio.read(1)
1264 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1265 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1266
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001267 def test_misbehaved_io(self):
1268 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1269 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001270 self.assertRaises(OSError, bufio.seek, 0)
1271 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001272
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001273 def test_no_extraneous_read(self):
1274 # Issue #9550; when the raw IO object has satisfied the read request,
1275 # we should not issue any additional reads, otherwise it may block
1276 # (e.g. socket).
1277 bufsize = 16
1278 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1279 rawio = self.MockRawIO([b"x" * n])
1280 bufio = self.tp(rawio, bufsize)
1281 self.assertEqual(bufio.read(n), b"x" * n)
1282 # Simple case: one raw read is enough to satisfy the request.
1283 self.assertEqual(rawio._extraneous_reads, 0,
1284 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1285 # A more complex case where two raw reads are needed to satisfy
1286 # the request.
1287 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1288 bufio = self.tp(rawio, bufsize)
1289 self.assertEqual(bufio.read(n), b"x" * n)
1290 self.assertEqual(rawio._extraneous_reads, 0,
1291 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1292
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001293 def test_read_on_closed(self):
1294 # Issue #23796
1295 b = io.BufferedReader(io.BytesIO(b"12"))
1296 b.read(1)
1297 b.close()
1298 self.assertRaises(ValueError, b.peek)
1299 self.assertRaises(ValueError, b.read1, 1)
1300
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001301
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001302class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001303 tp = io.BufferedReader
1304
1305 def test_constructor(self):
1306 BufferedReaderTest.test_constructor(self)
1307 # The allocation can succeed on 32-bit builds, e.g. with more
1308 # than 2GB RAM and a 64-bit kernel.
1309 if sys.maxsize > 0x7FFFFFFF:
1310 rawio = self.MockRawIO()
1311 bufio = self.tp(rawio)
1312 self.assertRaises((OverflowError, MemoryError, ValueError),
1313 bufio.__init__, rawio, sys.maxsize)
1314
1315 def test_initialization(self):
1316 rawio = self.MockRawIO([b"abc"])
1317 bufio = self.tp(rawio)
1318 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1319 self.assertRaises(ValueError, bufio.read)
1320 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1321 self.assertRaises(ValueError, bufio.read)
1322 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1323 self.assertRaises(ValueError, bufio.read)
1324
1325 def test_misbehaved_io_read(self):
1326 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1327 bufio = self.tp(rawio)
1328 # _pyio.BufferedReader seems to implement reading different, so that
1329 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001330 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001331
1332 def test_garbage_collection(self):
1333 # C BufferedReader objects are collected.
1334 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001335 with support.check_warnings(('', ResourceWarning)):
1336 rawio = self.FileIO(support.TESTFN, "w+b")
1337 f = self.tp(rawio)
1338 f.f = f
1339 wr = weakref.ref(f)
1340 del f
1341 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001342 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001343
R David Murray67bfe802013-02-23 21:51:05 -05001344 def test_args_error(self):
1345 # Issue #17275
1346 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1347 self.tp(io.BytesIO(), 1024, 1024, 1024)
1348
1349
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001350class PyBufferedReaderTest(BufferedReaderTest):
1351 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001352
Guido van Rossuma9e20242007-03-08 00:43:48 +00001353
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001354class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1355 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001356
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001357 def test_constructor(self):
1358 rawio = self.MockRawIO()
1359 bufio = self.tp(rawio)
1360 bufio.__init__(rawio)
1361 bufio.__init__(rawio, buffer_size=1024)
1362 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001363 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001364 bufio.flush()
1365 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1366 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1367 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1368 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001369 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001370 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001371 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001372
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001373 def test_uninitialized(self):
1374 bufio = self.tp.__new__(self.tp)
1375 del bufio
1376 bufio = self.tp.__new__(self.tp)
1377 self.assertRaisesRegex((ValueError, AttributeError),
1378 'uninitialized|has no attribute',
1379 bufio.write, b'')
1380 bufio.__init__(self.MockRawIO())
1381 self.assertEqual(bufio.write(b''), 0)
1382
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001383 def test_detach_flush(self):
1384 raw = self.MockRawIO()
1385 buf = self.tp(raw)
1386 buf.write(b"howdy!")
1387 self.assertFalse(raw._write_stack)
1388 buf.detach()
1389 self.assertEqual(raw._write_stack, [b"howdy!"])
1390
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001391 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001392 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001393 writer = self.MockRawIO()
1394 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001395 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001396 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001397
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001398 def test_write_overflow(self):
1399 writer = self.MockRawIO()
1400 bufio = self.tp(writer, 8)
1401 contents = b"abcdefghijklmnop"
1402 for n in range(0, len(contents), 3):
1403 bufio.write(contents[n:n+3])
1404 flushed = b"".join(writer._write_stack)
1405 # At least (total - 8) bytes were implicitly flushed, perhaps more
1406 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001407 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001408
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001409 def check_writes(self, intermediate_func):
1410 # Lots of writes, test the flushed output is as expected.
1411 contents = bytes(range(256)) * 1000
1412 n = 0
1413 writer = self.MockRawIO()
1414 bufio = self.tp(writer, 13)
1415 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1416 def gen_sizes():
1417 for size in count(1):
1418 for i in range(15):
1419 yield size
1420 sizes = gen_sizes()
1421 while n < len(contents):
1422 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001423 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001424 intermediate_func(bufio)
1425 n += size
1426 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001427 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001428
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001429 def test_writes(self):
1430 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001431
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001432 def test_writes_and_flushes(self):
1433 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001434
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001435 def test_writes_and_seeks(self):
1436 def _seekabs(bufio):
1437 pos = bufio.tell()
1438 bufio.seek(pos + 1, 0)
1439 bufio.seek(pos - 1, 0)
1440 bufio.seek(pos, 0)
1441 self.check_writes(_seekabs)
1442 def _seekrel(bufio):
1443 pos = bufio.seek(0, 1)
1444 bufio.seek(+1, 1)
1445 bufio.seek(-1, 1)
1446 bufio.seek(pos, 0)
1447 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001448
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001449 def test_writes_and_truncates(self):
1450 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001451
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001452 def test_write_non_blocking(self):
1453 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001454 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001455
Ezio Melottib3aedd42010-11-20 19:04:17 +00001456 self.assertEqual(bufio.write(b"abcd"), 4)
1457 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001458 # 1 byte will be written, the rest will be buffered
1459 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001460 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001461
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001462 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1463 raw.block_on(b"0")
1464 try:
1465 bufio.write(b"opqrwxyz0123456789")
1466 except self.BlockingIOError as e:
1467 written = e.characters_written
1468 else:
1469 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001470 self.assertEqual(written, 16)
1471 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001472 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001473
Ezio Melottib3aedd42010-11-20 19:04:17 +00001474 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001475 s = raw.pop_written()
1476 # Previously buffered bytes were flushed
1477 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001478
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001479 def test_write_and_rewind(self):
1480 raw = io.BytesIO()
1481 bufio = self.tp(raw, 4)
1482 self.assertEqual(bufio.write(b"abcdef"), 6)
1483 self.assertEqual(bufio.tell(), 6)
1484 bufio.seek(0, 0)
1485 self.assertEqual(bufio.write(b"XY"), 2)
1486 bufio.seek(6, 0)
1487 self.assertEqual(raw.getvalue(), b"XYcdef")
1488 self.assertEqual(bufio.write(b"123456"), 6)
1489 bufio.flush()
1490 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001491
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001492 def test_flush(self):
1493 writer = self.MockRawIO()
1494 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001495 bufio.write(b"abc")
1496 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001497 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001498
Antoine Pitrou131a4892012-10-16 22:57:11 +02001499 def test_writelines(self):
1500 l = [b'ab', b'cd', b'ef']
1501 writer = self.MockRawIO()
1502 bufio = self.tp(writer, 8)
1503 bufio.writelines(l)
1504 bufio.flush()
1505 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1506
1507 def test_writelines_userlist(self):
1508 l = UserList([b'ab', b'cd', b'ef'])
1509 writer = self.MockRawIO()
1510 bufio = self.tp(writer, 8)
1511 bufio.writelines(l)
1512 bufio.flush()
1513 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1514
1515 def test_writelines_error(self):
1516 writer = self.MockRawIO()
1517 bufio = self.tp(writer, 8)
1518 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1519 self.assertRaises(TypeError, bufio.writelines, None)
1520 self.assertRaises(TypeError, bufio.writelines, 'abc')
1521
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001522 def test_destructor(self):
1523 writer = self.MockRawIO()
1524 bufio = self.tp(writer, 8)
1525 bufio.write(b"abc")
1526 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001527 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001528 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001529
1530 def test_truncate(self):
1531 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001532 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001533 bufio = self.tp(raw, 8)
1534 bufio.write(b"abcdef")
1535 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001536 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001537 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001538 self.assertEqual(f.read(), b"abc")
1539
Victor Stinner45df8202010-04-28 22:31:17 +00001540 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001541 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001542 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001543 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001544 # Write out many bytes from many threads and test they were
1545 # all flushed.
1546 N = 1000
1547 contents = bytes(range(256)) * N
1548 sizes = cycle([1, 19])
1549 n = 0
1550 queue = deque()
1551 while n < len(contents):
1552 size = next(sizes)
1553 queue.append(contents[n:n+size])
1554 n += size
1555 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001556 # We use a real file object because it allows us to
1557 # exercise situations where the GIL is released before
1558 # writing the buffer to the raw streams. This is in addition
1559 # to concurrency issues due to switching threads in the middle
1560 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001561 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001562 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001563 errors = []
1564 def f():
1565 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001566 while True:
1567 try:
1568 s = queue.popleft()
1569 except IndexError:
1570 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001571 bufio.write(s)
1572 except Exception as e:
1573 errors.append(e)
1574 raise
1575 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001576 with support.start_threads(threads):
1577 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001578 self.assertFalse(errors,
1579 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001580 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001581 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001582 s = f.read()
1583 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001584 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001585 finally:
1586 support.unlink(support.TESTFN)
1587
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001588 def test_misbehaved_io(self):
1589 rawio = self.MisbehavedRawIO()
1590 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001591 self.assertRaises(OSError, bufio.seek, 0)
1592 self.assertRaises(OSError, bufio.tell)
1593 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001594
Florent Xicluna109d5732012-07-07 17:03:22 +02001595 def test_max_buffer_size_removal(self):
1596 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001597 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001598
Benjamin Peterson68623612012-12-20 11:53:11 -06001599 def test_write_error_on_close(self):
1600 raw = self.MockRawIO()
1601 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001602 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001603 raw.write = bad_write
1604 b = self.tp(raw)
1605 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001606 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001607 self.assertTrue(b.closed)
1608
Benjamin Peterson59406a92009-03-26 17:10:29 +00001609
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001610class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001611 tp = io.BufferedWriter
1612
1613 def test_constructor(self):
1614 BufferedWriterTest.test_constructor(self)
1615 # The allocation can succeed on 32-bit builds, e.g. with more
1616 # than 2GB RAM and a 64-bit kernel.
1617 if sys.maxsize > 0x7FFFFFFF:
1618 rawio = self.MockRawIO()
1619 bufio = self.tp(rawio)
1620 self.assertRaises((OverflowError, MemoryError, ValueError),
1621 bufio.__init__, rawio, sys.maxsize)
1622
1623 def test_initialization(self):
1624 rawio = self.MockRawIO()
1625 bufio = self.tp(rawio)
1626 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1627 self.assertRaises(ValueError, bufio.write, b"def")
1628 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1629 self.assertRaises(ValueError, bufio.write, b"def")
1630 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1631 self.assertRaises(ValueError, bufio.write, b"def")
1632
1633 def test_garbage_collection(self):
1634 # C BufferedWriter objects are collected, and collecting them flushes
1635 # all data to disk.
1636 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001637 with support.check_warnings(('', ResourceWarning)):
1638 rawio = self.FileIO(support.TESTFN, "w+b")
1639 f = self.tp(rawio)
1640 f.write(b"123xxx")
1641 f.x = f
1642 wr = weakref.ref(f)
1643 del f
1644 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001645 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001646 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001647 self.assertEqual(f.read(), b"123xxx")
1648
R David Murray67bfe802013-02-23 21:51:05 -05001649 def test_args_error(self):
1650 # Issue #17275
1651 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1652 self.tp(io.BytesIO(), 1024, 1024, 1024)
1653
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001654
1655class PyBufferedWriterTest(BufferedWriterTest):
1656 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001657
Guido van Rossum01a27522007-03-07 01:00:12 +00001658class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001659
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001660 def test_constructor(self):
1661 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001662 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001663
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001664 def test_uninitialized(self):
1665 pair = self.tp.__new__(self.tp)
1666 del pair
1667 pair = self.tp.__new__(self.tp)
1668 self.assertRaisesRegex((ValueError, AttributeError),
1669 'uninitialized|has no attribute',
1670 pair.read, 0)
1671 self.assertRaisesRegex((ValueError, AttributeError),
1672 'uninitialized|has no attribute',
1673 pair.write, b'')
1674 pair.__init__(self.MockRawIO(), self.MockRawIO())
1675 self.assertEqual(pair.read(0), b'')
1676 self.assertEqual(pair.write(b''), 0)
1677
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001678 def test_detach(self):
1679 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1680 self.assertRaises(self.UnsupportedOperation, pair.detach)
1681
Florent Xicluna109d5732012-07-07 17:03:22 +02001682 def test_constructor_max_buffer_size_removal(self):
1683 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001684 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001685
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001686 def test_constructor_with_not_readable(self):
1687 class NotReadable(MockRawIO):
1688 def readable(self):
1689 return False
1690
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001691 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001692
1693 def test_constructor_with_not_writeable(self):
1694 class NotWriteable(MockRawIO):
1695 def writable(self):
1696 return False
1697
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001698 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001699
1700 def test_read(self):
1701 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1702
1703 self.assertEqual(pair.read(3), b"abc")
1704 self.assertEqual(pair.read(1), b"d")
1705 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001706 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1707 self.assertEqual(pair.read(None), b"abc")
1708
1709 def test_readlines(self):
1710 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1711 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1712 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1713 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001714
1715 def test_read1(self):
1716 # .read1() is delegated to the underlying reader object, so this test
1717 # can be shallow.
1718 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1719
1720 self.assertEqual(pair.read1(3), b"abc")
1721
1722 def test_readinto(self):
1723 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1724
1725 data = bytearray(5)
1726 self.assertEqual(pair.readinto(data), 5)
1727 self.assertEqual(data, b"abcde")
1728
1729 def test_write(self):
1730 w = self.MockRawIO()
1731 pair = self.tp(self.MockRawIO(), w)
1732
1733 pair.write(b"abc")
1734 pair.flush()
1735 pair.write(b"def")
1736 pair.flush()
1737 self.assertEqual(w._write_stack, [b"abc", b"def"])
1738
1739 def test_peek(self):
1740 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1741
1742 self.assertTrue(pair.peek(3).startswith(b"abc"))
1743 self.assertEqual(pair.read(3), b"abc")
1744
1745 def test_readable(self):
1746 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1747 self.assertTrue(pair.readable())
1748
1749 def test_writeable(self):
1750 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1751 self.assertTrue(pair.writable())
1752
1753 def test_seekable(self):
1754 # BufferedRWPairs are never seekable, even if their readers and writers
1755 # are.
1756 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1757 self.assertFalse(pair.seekable())
1758
1759 # .flush() is delegated to the underlying writer object and has been
1760 # tested in the test_write method.
1761
1762 def test_close_and_closed(self):
1763 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1764 self.assertFalse(pair.closed)
1765 pair.close()
1766 self.assertTrue(pair.closed)
1767
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001768 def test_reader_close_error_on_close(self):
1769 def reader_close():
1770 reader_non_existing
1771 reader = self.MockRawIO()
1772 reader.close = reader_close
1773 writer = self.MockRawIO()
1774 pair = self.tp(reader, writer)
1775 with self.assertRaises(NameError) as err:
1776 pair.close()
1777 self.assertIn('reader_non_existing', str(err.exception))
1778 self.assertTrue(pair.closed)
1779 self.assertFalse(reader.closed)
1780 self.assertTrue(writer.closed)
1781
1782 def test_writer_close_error_on_close(self):
1783 def writer_close():
1784 writer_non_existing
1785 reader = self.MockRawIO()
1786 writer = self.MockRawIO()
1787 writer.close = writer_close
1788 pair = self.tp(reader, writer)
1789 with self.assertRaises(NameError) as err:
1790 pair.close()
1791 self.assertIn('writer_non_existing', str(err.exception))
1792 self.assertFalse(pair.closed)
1793 self.assertTrue(reader.closed)
1794 self.assertFalse(writer.closed)
1795
1796 def test_reader_writer_close_error_on_close(self):
1797 def reader_close():
1798 reader_non_existing
1799 def writer_close():
1800 writer_non_existing
1801 reader = self.MockRawIO()
1802 reader.close = reader_close
1803 writer = self.MockRawIO()
1804 writer.close = writer_close
1805 pair = self.tp(reader, writer)
1806 with self.assertRaises(NameError) as err:
1807 pair.close()
1808 self.assertIn('reader_non_existing', str(err.exception))
1809 self.assertIsInstance(err.exception.__context__, NameError)
1810 self.assertIn('writer_non_existing', str(err.exception.__context__))
1811 self.assertFalse(pair.closed)
1812 self.assertFalse(reader.closed)
1813 self.assertFalse(writer.closed)
1814
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001815 def test_isatty(self):
1816 class SelectableIsAtty(MockRawIO):
1817 def __init__(self, isatty):
1818 MockRawIO.__init__(self)
1819 self._isatty = isatty
1820
1821 def isatty(self):
1822 return self._isatty
1823
1824 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1825 self.assertFalse(pair.isatty())
1826
1827 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1828 self.assertTrue(pair.isatty())
1829
1830 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1831 self.assertTrue(pair.isatty())
1832
1833 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1834 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001835
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001836 def test_weakref_clearing(self):
1837 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1838 ref = weakref.ref(brw)
1839 brw = None
1840 ref = None # Shouldn't segfault.
1841
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001842class CBufferedRWPairTest(BufferedRWPairTest):
1843 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001844
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001845class PyBufferedRWPairTest(BufferedRWPairTest):
1846 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001847
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001848
1849class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1850 read_mode = "rb+"
1851 write_mode = "wb+"
1852
1853 def test_constructor(self):
1854 BufferedReaderTest.test_constructor(self)
1855 BufferedWriterTest.test_constructor(self)
1856
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001857 def test_uninitialized(self):
1858 BufferedReaderTest.test_uninitialized(self)
1859 BufferedWriterTest.test_uninitialized(self)
1860
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001861 def test_read_and_write(self):
1862 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001863 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001864
1865 self.assertEqual(b"as", rw.read(2))
1866 rw.write(b"ddd")
1867 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001868 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001869 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001870 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001871
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001872 def test_seek_and_tell(self):
1873 raw = self.BytesIO(b"asdfghjkl")
1874 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001875
Ezio Melottib3aedd42010-11-20 19:04:17 +00001876 self.assertEqual(b"as", rw.read(2))
1877 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001878 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001879 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001880
Antoine Pitroue05565e2011-08-20 14:39:23 +02001881 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001882 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001883 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001884 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001885 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001886 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001887 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001888 self.assertEqual(7, rw.tell())
1889 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001890 rw.flush()
1891 self.assertEqual(b"asdf123fl", raw.getvalue())
1892
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001893 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001894
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001895 def check_flush_and_read(self, read_func):
1896 raw = self.BytesIO(b"abcdefghi")
1897 bufio = self.tp(raw)
1898
Ezio Melottib3aedd42010-11-20 19:04:17 +00001899 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001900 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001901 self.assertEqual(b"ef", read_func(bufio, 2))
1902 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001903 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001904 self.assertEqual(6, bufio.tell())
1905 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001906 raw.seek(0, 0)
1907 raw.write(b"XYZ")
1908 # flush() resets the read buffer
1909 bufio.flush()
1910 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001911 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001912
1913 def test_flush_and_read(self):
1914 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1915
1916 def test_flush_and_readinto(self):
1917 def _readinto(bufio, n=-1):
1918 b = bytearray(n if n >= 0 else 9999)
1919 n = bufio.readinto(b)
1920 return bytes(b[:n])
1921 self.check_flush_and_read(_readinto)
1922
1923 def test_flush_and_peek(self):
1924 def _peek(bufio, n=-1):
1925 # This relies on the fact that the buffer can contain the whole
1926 # raw stream, otherwise peek() can return less.
1927 b = bufio.peek(n)
1928 if n != -1:
1929 b = b[:n]
1930 bufio.seek(len(b), 1)
1931 return b
1932 self.check_flush_and_read(_peek)
1933
1934 def test_flush_and_write(self):
1935 raw = self.BytesIO(b"abcdefghi")
1936 bufio = self.tp(raw)
1937
1938 bufio.write(b"123")
1939 bufio.flush()
1940 bufio.write(b"45")
1941 bufio.flush()
1942 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001943 self.assertEqual(b"12345fghi", raw.getvalue())
1944 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001945
1946 def test_threads(self):
1947 BufferedReaderTest.test_threads(self)
1948 BufferedWriterTest.test_threads(self)
1949
1950 def test_writes_and_peek(self):
1951 def _peek(bufio):
1952 bufio.peek(1)
1953 self.check_writes(_peek)
1954 def _peek(bufio):
1955 pos = bufio.tell()
1956 bufio.seek(-1, 1)
1957 bufio.peek(1)
1958 bufio.seek(pos, 0)
1959 self.check_writes(_peek)
1960
1961 def test_writes_and_reads(self):
1962 def _read(bufio):
1963 bufio.seek(-1, 1)
1964 bufio.read(1)
1965 self.check_writes(_read)
1966
1967 def test_writes_and_read1s(self):
1968 def _read1(bufio):
1969 bufio.seek(-1, 1)
1970 bufio.read1(1)
1971 self.check_writes(_read1)
1972
1973 def test_writes_and_readintos(self):
1974 def _read(bufio):
1975 bufio.seek(-1, 1)
1976 bufio.readinto(bytearray(1))
1977 self.check_writes(_read)
1978
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001979 def test_write_after_readahead(self):
1980 # Issue #6629: writing after the buffer was filled by readahead should
1981 # first rewind the raw stream.
1982 for overwrite_size in [1, 5]:
1983 raw = self.BytesIO(b"A" * 10)
1984 bufio = self.tp(raw, 4)
1985 # Trigger readahead
1986 self.assertEqual(bufio.read(1), b"A")
1987 self.assertEqual(bufio.tell(), 1)
1988 # Overwriting should rewind the raw stream if it needs so
1989 bufio.write(b"B" * overwrite_size)
1990 self.assertEqual(bufio.tell(), overwrite_size + 1)
1991 # If the write size was smaller than the buffer size, flush() and
1992 # check that rewind happens.
1993 bufio.flush()
1994 self.assertEqual(bufio.tell(), overwrite_size + 1)
1995 s = raw.getvalue()
1996 self.assertEqual(s,
1997 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1998
Antoine Pitrou7c404892011-05-13 00:13:33 +02001999 def test_write_rewind_write(self):
2000 # Various combinations of reading / writing / seeking backwards / writing again
2001 def mutate(bufio, pos1, pos2):
2002 assert pos2 >= pos1
2003 # Fill the buffer
2004 bufio.seek(pos1)
2005 bufio.read(pos2 - pos1)
2006 bufio.write(b'\x02')
2007 # This writes earlier than the previous write, but still inside
2008 # the buffer.
2009 bufio.seek(pos1)
2010 bufio.write(b'\x01')
2011
2012 b = b"\x80\x81\x82\x83\x84"
2013 for i in range(0, len(b)):
2014 for j in range(i, len(b)):
2015 raw = self.BytesIO(b)
2016 bufio = self.tp(raw, 100)
2017 mutate(bufio, i, j)
2018 bufio.flush()
2019 expected = bytearray(b)
2020 expected[j] = 2
2021 expected[i] = 1
2022 self.assertEqual(raw.getvalue(), expected,
2023 "failed result for i=%d, j=%d" % (i, j))
2024
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002025 def test_truncate_after_read_or_write(self):
2026 raw = self.BytesIO(b"A" * 10)
2027 bufio = self.tp(raw, 100)
2028 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2029 self.assertEqual(bufio.truncate(), 2)
2030 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2031 self.assertEqual(bufio.truncate(), 4)
2032
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002033 def test_misbehaved_io(self):
2034 BufferedReaderTest.test_misbehaved_io(self)
2035 BufferedWriterTest.test_misbehaved_io(self)
2036
Antoine Pitroue05565e2011-08-20 14:39:23 +02002037 def test_interleaved_read_write(self):
2038 # Test for issue #12213
2039 with self.BytesIO(b'abcdefgh') as raw:
2040 with self.tp(raw, 100) as f:
2041 f.write(b"1")
2042 self.assertEqual(f.read(1), b'b')
2043 f.write(b'2')
2044 self.assertEqual(f.read1(1), b'd')
2045 f.write(b'3')
2046 buf = bytearray(1)
2047 f.readinto(buf)
2048 self.assertEqual(buf, b'f')
2049 f.write(b'4')
2050 self.assertEqual(f.peek(1), b'h')
2051 f.flush()
2052 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2053
2054 with self.BytesIO(b'abc') as raw:
2055 with self.tp(raw, 100) as f:
2056 self.assertEqual(f.read(1), b'a')
2057 f.write(b"2")
2058 self.assertEqual(f.read(1), b'c')
2059 f.flush()
2060 self.assertEqual(raw.getvalue(), b'a2c')
2061
2062 def test_interleaved_readline_write(self):
2063 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2064 with self.tp(raw) as f:
2065 f.write(b'1')
2066 self.assertEqual(f.readline(), b'b\n')
2067 f.write(b'2')
2068 self.assertEqual(f.readline(), b'def\n')
2069 f.write(b'3')
2070 self.assertEqual(f.readline(), b'\n')
2071 f.flush()
2072 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2073
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002074 # You can't construct a BufferedRandom over a non-seekable stream.
2075 test_unseekable = None
2076
R David Murray67bfe802013-02-23 21:51:05 -05002077
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002078class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002079 tp = io.BufferedRandom
2080
2081 def test_constructor(self):
2082 BufferedRandomTest.test_constructor(self)
2083 # The allocation can succeed on 32-bit builds, e.g. with more
2084 # than 2GB RAM and a 64-bit kernel.
2085 if sys.maxsize > 0x7FFFFFFF:
2086 rawio = self.MockRawIO()
2087 bufio = self.tp(rawio)
2088 self.assertRaises((OverflowError, MemoryError, ValueError),
2089 bufio.__init__, rawio, sys.maxsize)
2090
2091 def test_garbage_collection(self):
2092 CBufferedReaderTest.test_garbage_collection(self)
2093 CBufferedWriterTest.test_garbage_collection(self)
2094
R David Murray67bfe802013-02-23 21:51:05 -05002095 def test_args_error(self):
2096 # Issue #17275
2097 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2098 self.tp(io.BytesIO(), 1024, 1024, 1024)
2099
2100
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002101class PyBufferedRandomTest(BufferedRandomTest):
2102 tp = pyio.BufferedRandom
2103
2104
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002105# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2106# properties:
2107# - A single output character can correspond to many bytes of input.
2108# - The number of input bytes to complete the character can be
2109# undetermined until the last input byte is received.
2110# - The number of input bytes can vary depending on previous input.
2111# - A single input byte can correspond to many characters of output.
2112# - The number of output characters can be undetermined until the
2113# last input byte is received.
2114# - The number of output characters can vary depending on previous input.
2115
2116class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2117 """
2118 For testing seek/tell behavior with a stateful, buffering decoder.
2119
2120 Input is a sequence of words. Words may be fixed-length (length set
2121 by input) or variable-length (period-terminated). In variable-length
2122 mode, extra periods are ignored. Possible words are:
2123 - 'i' followed by a number sets the input length, I (maximum 99).
2124 When I is set to 0, words are space-terminated.
2125 - 'o' followed by a number sets the output length, O (maximum 99).
2126 - Any other word is converted into a word followed by a period on
2127 the output. The output word consists of the input word truncated
2128 or padded out with hyphens to make its length equal to O. If O
2129 is 0, the word is output verbatim without truncating or padding.
2130 I and O are initially set to 1. When I changes, any buffered input is
2131 re-scanned according to the new I. EOF also terminates the last word.
2132 """
2133
2134 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002135 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002136 self.reset()
2137
2138 def __repr__(self):
2139 return '<SID %x>' % id(self)
2140
2141 def reset(self):
2142 self.i = 1
2143 self.o = 1
2144 self.buffer = bytearray()
2145
2146 def getstate(self):
2147 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2148 return bytes(self.buffer), i*100 + o
2149
2150 def setstate(self, state):
2151 buffer, io = state
2152 self.buffer = bytearray(buffer)
2153 i, o = divmod(io, 100)
2154 self.i, self.o = i ^ 1, o ^ 1
2155
2156 def decode(self, input, final=False):
2157 output = ''
2158 for b in input:
2159 if self.i == 0: # variable-length, terminated with period
2160 if b == ord('.'):
2161 if self.buffer:
2162 output += self.process_word()
2163 else:
2164 self.buffer.append(b)
2165 else: # fixed-length, terminate after self.i bytes
2166 self.buffer.append(b)
2167 if len(self.buffer) == self.i:
2168 output += self.process_word()
2169 if final and self.buffer: # EOF terminates the last word
2170 output += self.process_word()
2171 return output
2172
2173 def process_word(self):
2174 output = ''
2175 if self.buffer[0] == ord('i'):
2176 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2177 elif self.buffer[0] == ord('o'):
2178 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2179 else:
2180 output = self.buffer.decode('ascii')
2181 if len(output) < self.o:
2182 output += '-'*self.o # pad out with hyphens
2183 if self.o:
2184 output = output[:self.o] # truncate to output length
2185 output += '.'
2186 self.buffer = bytearray()
2187 return output
2188
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002189 codecEnabled = False
2190
2191 @classmethod
2192 def lookupTestDecoder(cls, name):
2193 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002194 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002195 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002196 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002197 incrementalencoder=None,
2198 streamreader=None, streamwriter=None,
2199 incrementaldecoder=cls)
2200
2201# Register the previous decoder for testing.
2202# Disabled by default, tests will enable it.
2203codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2204
2205
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002206class StatefulIncrementalDecoderTest(unittest.TestCase):
2207 """
2208 Make sure the StatefulIncrementalDecoder actually works.
2209 """
2210
2211 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002212 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002213 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002214 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002215 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002216 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002217 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002218 # I=0, O=6 (variable-length input, fixed-length output)
2219 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2220 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002221 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002222 # I=6, O=3 (fixed-length input > fixed-length output)
2223 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2224 # I=0, then 3; O=29, then 15 (with longer output)
2225 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2226 'a----------------------------.' +
2227 'b----------------------------.' +
2228 'cde--------------------------.' +
2229 'abcdefghijabcde.' +
2230 'a.b------------.' +
2231 '.c.------------.' +
2232 'd.e------------.' +
2233 'k--------------.' +
2234 'l--------------.' +
2235 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002236 ]
2237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002238 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002239 # Try a few one-shot test cases.
2240 for input, eof, output in self.test_cases:
2241 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002242 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002243
2244 # Also test an unfinished decode, followed by forcing EOF.
2245 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002246 self.assertEqual(d.decode(b'oiabcd'), '')
2247 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002248
2249class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002250
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002251 def setUp(self):
2252 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2253 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002254 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002255
Guido van Rossumd0712812007-04-11 16:32:43 +00002256 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002257 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002258
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002259 def test_constructor(self):
2260 r = self.BytesIO(b"\xc3\xa9\n\n")
2261 b = self.BufferedReader(r, 1000)
2262 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002263 t.__init__(b, encoding="latin-1", newline="\r\n")
2264 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002265 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002266 t.__init__(b, encoding="utf-8", line_buffering=True)
2267 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002268 self.assertEqual(t.line_buffering, True)
2269 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002270 self.assertRaises(TypeError, t.__init__, b, newline=42)
2271 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2272
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002273 def test_uninitialized(self):
2274 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2275 del t
2276 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2277 self.assertRaises(Exception, repr, t)
2278 self.assertRaisesRegex((ValueError, AttributeError),
2279 'uninitialized|has no attribute',
2280 t.read, 0)
2281 t.__init__(self.MockRawIO())
2282 self.assertEqual(t.read(0), '')
2283
Nick Coghlana9b15242014-02-04 22:11:18 +10002284 def test_non_text_encoding_codecs_are_rejected(self):
2285 # Ensure the constructor complains if passed a codec that isn't
2286 # marked as a text encoding
2287 # http://bugs.python.org/issue20404
2288 r = self.BytesIO()
2289 b = self.BufferedWriter(r)
2290 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2291 self.TextIOWrapper(b, encoding="hex")
2292
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002293 def test_detach(self):
2294 r = self.BytesIO()
2295 b = self.BufferedWriter(r)
2296 t = self.TextIOWrapper(b)
2297 self.assertIs(t.detach(), b)
2298
2299 t = self.TextIOWrapper(b, encoding="ascii")
2300 t.write("howdy")
2301 self.assertFalse(r.getvalue())
2302 t.detach()
2303 self.assertEqual(r.getvalue(), b"howdy")
2304 self.assertRaises(ValueError, t.detach)
2305
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002306 # Operations independent of the detached stream should still work
2307 repr(t)
2308 self.assertEqual(t.encoding, "ascii")
2309 self.assertEqual(t.errors, "strict")
2310 self.assertFalse(t.line_buffering)
2311
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002312 def test_repr(self):
2313 raw = self.BytesIO("hello".encode("utf-8"))
2314 b = self.BufferedReader(raw)
2315 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002316 modname = self.TextIOWrapper.__module__
2317 self.assertEqual(repr(t),
2318 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2319 raw.name = "dummy"
2320 self.assertEqual(repr(t),
2321 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002322 t.mode = "r"
2323 self.assertEqual(repr(t),
2324 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002325 raw.name = b"dummy"
2326 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002327 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002328
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002329 t.buffer.detach()
2330 repr(t) # Should not raise an exception
2331
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002332 def test_line_buffering(self):
2333 r = self.BytesIO()
2334 b = self.BufferedWriter(r, 1000)
2335 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002336 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002337 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002338 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002339 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002340 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002341 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002342
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002343 def test_default_encoding(self):
2344 old_environ = dict(os.environ)
2345 try:
2346 # try to get a user preferred encoding different than the current
2347 # locale encoding to check that TextIOWrapper() uses the current
2348 # locale encoding and not the user preferred encoding
2349 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2350 if key in os.environ:
2351 del os.environ[key]
2352
2353 current_locale_encoding = locale.getpreferredencoding(False)
2354 b = self.BytesIO()
2355 t = self.TextIOWrapper(b)
2356 self.assertEqual(t.encoding, current_locale_encoding)
2357 finally:
2358 os.environ.clear()
2359 os.environ.update(old_environ)
2360
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002361 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002362 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002363 # Issue 15989
2364 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002365 b = self.BytesIO()
2366 b.fileno = lambda: _testcapi.INT_MAX + 1
2367 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2368 b.fileno = lambda: _testcapi.UINT_MAX + 1
2369 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2370
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002371 def test_encoding(self):
2372 # Check the encoding attribute is always set, and valid
2373 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002374 t = self.TextIOWrapper(b, encoding="utf-8")
2375 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002376 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002377 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002378 codecs.lookup(t.encoding)
2379
2380 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002381 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002382 b = self.BytesIO(b"abc\n\xff\n")
2383 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002384 self.assertRaises(UnicodeError, t.read)
2385 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002386 b = self.BytesIO(b"abc\n\xff\n")
2387 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002388 self.assertRaises(UnicodeError, t.read)
2389 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002390 b = self.BytesIO(b"abc\n\xff\n")
2391 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002392 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002393 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002394 b = self.BytesIO(b"abc\n\xff\n")
2395 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002396 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002397
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002398 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002399 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002400 b = self.BytesIO()
2401 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002402 self.assertRaises(UnicodeError, t.write, "\xff")
2403 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002404 b = self.BytesIO()
2405 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002406 self.assertRaises(UnicodeError, t.write, "\xff")
2407 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002408 b = self.BytesIO()
2409 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002410 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002411 t.write("abc\xffdef\n")
2412 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002413 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002414 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002415 b = self.BytesIO()
2416 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002417 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002418 t.write("abc\xffdef\n")
2419 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002420 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002421
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002422 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002423 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2424
2425 tests = [
2426 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002427 [ '', input_lines ],
2428 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2429 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2430 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002431 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002432 encodings = (
2433 'utf-8', 'latin-1',
2434 'utf-16', 'utf-16-le', 'utf-16-be',
2435 'utf-32', 'utf-32-le', 'utf-32-be',
2436 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002437
Guido van Rossum8358db22007-08-18 21:39:55 +00002438 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002439 # character in TextIOWrapper._pending_line.
2440 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002441 # XXX: str.encode() should return bytes
2442 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002443 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002444 for bufsize in range(1, 10):
2445 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002446 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2447 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002448 encoding=encoding)
2449 if do_reads:
2450 got_lines = []
2451 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002452 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002453 if c2 == '':
2454 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002455 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002456 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002457 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002458 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002459
2460 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002461 self.assertEqual(got_line, exp_line)
2462 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002463
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002464 def test_newlines_input(self):
2465 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002466 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2467 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002468 (None, normalized.decode("ascii").splitlines(keepends=True)),
2469 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002470 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2471 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2472 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002473 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002474 buf = self.BytesIO(testdata)
2475 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002476 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002477 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002478 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002479
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002480 def test_newlines_output(self):
2481 testdict = {
2482 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2483 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2484 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2485 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2486 }
2487 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2488 for newline, expected in tests:
2489 buf = self.BytesIO()
2490 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2491 txt.write("AAA\nB")
2492 txt.write("BB\nCCC\n")
2493 txt.write("X\rY\r\nZ")
2494 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002495 self.assertEqual(buf.closed, False)
2496 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002497
2498 def test_destructor(self):
2499 l = []
2500 base = self.BytesIO
2501 class MyBytesIO(base):
2502 def close(self):
2503 l.append(self.getvalue())
2504 base.close(self)
2505 b = MyBytesIO()
2506 t = self.TextIOWrapper(b, encoding="ascii")
2507 t.write("abc")
2508 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002509 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002510 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002511
2512 def test_override_destructor(self):
2513 record = []
2514 class MyTextIO(self.TextIOWrapper):
2515 def __del__(self):
2516 record.append(1)
2517 try:
2518 f = super().__del__
2519 except AttributeError:
2520 pass
2521 else:
2522 f()
2523 def close(self):
2524 record.append(2)
2525 super().close()
2526 def flush(self):
2527 record.append(3)
2528 super().flush()
2529 b = self.BytesIO()
2530 t = MyTextIO(b, encoding="ascii")
2531 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002532 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002533 self.assertEqual(record, [1, 2, 3])
2534
2535 def test_error_through_destructor(self):
2536 # Test that the exception state is not modified by a destructor,
2537 # even if close() fails.
2538 rawio = self.CloseFailureIO()
2539 def f():
2540 self.TextIOWrapper(rawio).xyzzy
2541 with support.captured_output("stderr") as s:
2542 self.assertRaises(AttributeError, f)
2543 s = s.getvalue().strip()
2544 if s:
2545 # The destructor *may* have printed an unraisable error, check it
2546 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002547 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002548 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002549
Guido van Rossum9b76da62007-04-11 01:09:03 +00002550 # Systematic tests of the text I/O API
2551
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002552 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002553 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002554 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002555 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002556 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002557 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002558 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002559 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002560 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002561 self.assertEqual(f.tell(), 0)
2562 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002563 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002564 self.assertEqual(f.seek(0), 0)
2565 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002566 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002567 self.assertEqual(f.read(2), "ab")
2568 self.assertEqual(f.read(1), "c")
2569 self.assertEqual(f.read(1), "")
2570 self.assertEqual(f.read(), "")
2571 self.assertEqual(f.tell(), cookie)
2572 self.assertEqual(f.seek(0), 0)
2573 self.assertEqual(f.seek(0, 2), cookie)
2574 self.assertEqual(f.write("def"), 3)
2575 self.assertEqual(f.seek(cookie), cookie)
2576 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002577 if enc.startswith("utf"):
2578 self.multi_line_test(f, enc)
2579 f.close()
2580
2581 def multi_line_test(self, f, enc):
2582 f.seek(0)
2583 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002584 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002585 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002586 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002587 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002588 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002589 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002590 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002591 wlines.append((f.tell(), line))
2592 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002593 f.seek(0)
2594 rlines = []
2595 while True:
2596 pos = f.tell()
2597 line = f.readline()
2598 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002599 break
2600 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002601 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002602
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002603 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002604 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002605 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002606 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002607 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002608 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002609 p2 = f.tell()
2610 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002611 self.assertEqual(f.tell(), p0)
2612 self.assertEqual(f.readline(), "\xff\n")
2613 self.assertEqual(f.tell(), p1)
2614 self.assertEqual(f.readline(), "\xff\n")
2615 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002616 f.seek(0)
2617 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002618 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002619 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002620 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002621 f.close()
2622
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002623 def test_seeking(self):
2624 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002625 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002626 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002627 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002628 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002629 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002630 suffix = bytes(u_suffix.encode("utf-8"))
2631 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002632 with self.open(support.TESTFN, "wb") as f:
2633 f.write(line*2)
2634 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2635 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002636 self.assertEqual(s, str(prefix, "ascii"))
2637 self.assertEqual(f.tell(), prefix_size)
2638 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002639
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002640 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002641 # Regression test for a specific bug
2642 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002643 with self.open(support.TESTFN, "wb") as f:
2644 f.write(data)
2645 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2646 f._CHUNK_SIZE # Just test that it exists
2647 f._CHUNK_SIZE = 2
2648 f.readline()
2649 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002650
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002651 def test_seek_and_tell(self):
2652 #Test seek/tell using the StatefulIncrementalDecoder.
2653 # Make test faster by doing smaller seeks
2654 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002655
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002656 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002657 """Tell/seek to various points within a data stream and ensure
2658 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002659 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002660 f.write(data)
2661 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002662 f = self.open(support.TESTFN, encoding='test_decoder')
2663 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002664 decoded = f.read()
2665 f.close()
2666
Neal Norwitze2b07052008-03-18 19:52:05 +00002667 for i in range(min_pos, len(decoded) + 1): # seek positions
2668 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002669 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002670 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002671 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002672 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002673 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002674 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002675 f.close()
2676
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002677 # Enable the test decoder.
2678 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002679
2680 # Run the tests.
2681 try:
2682 # Try each test case.
2683 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002684 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002685
2686 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002687 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2688 offset = CHUNK_SIZE - len(input)//2
2689 prefix = b'.'*offset
2690 # Don't bother seeking into the prefix (takes too long).
2691 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002692 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002693
2694 # Ensure our test decoder won't interfere with subsequent tests.
2695 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002696 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002697
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002698 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002699 data = "1234567890"
2700 tests = ("utf-16",
2701 "utf-16-le",
2702 "utf-16-be",
2703 "utf-32",
2704 "utf-32-le",
2705 "utf-32-be")
2706 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002707 buf = self.BytesIO()
2708 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002709 # Check if the BOM is written only once (see issue1753).
2710 f.write(data)
2711 f.write(data)
2712 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002713 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002714 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002715 self.assertEqual(f.read(), data * 2)
2716 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002717
Benjamin Petersona1b49012009-03-31 23:11:32 +00002718 def test_unreadable(self):
2719 class UnReadable(self.BytesIO):
2720 def readable(self):
2721 return False
2722 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002723 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002724
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002725 def test_read_one_by_one(self):
2726 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002727 reads = ""
2728 while True:
2729 c = txt.read(1)
2730 if not c:
2731 break
2732 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002733 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002734
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002735 def test_readlines(self):
2736 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2737 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2738 txt.seek(0)
2739 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2740 txt.seek(0)
2741 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2742
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002743 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002744 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002745 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002746 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002747 reads = ""
2748 while True:
2749 c = txt.read(128)
2750 if not c:
2751 break
2752 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002753 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002754
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002755 def test_writelines(self):
2756 l = ['ab', 'cd', 'ef']
2757 buf = self.BytesIO()
2758 txt = self.TextIOWrapper(buf)
2759 txt.writelines(l)
2760 txt.flush()
2761 self.assertEqual(buf.getvalue(), b'abcdef')
2762
2763 def test_writelines_userlist(self):
2764 l = UserList(['ab', 'cd', 'ef'])
2765 buf = self.BytesIO()
2766 txt = self.TextIOWrapper(buf)
2767 txt.writelines(l)
2768 txt.flush()
2769 self.assertEqual(buf.getvalue(), b'abcdef')
2770
2771 def test_writelines_error(self):
2772 txt = self.TextIOWrapper(self.BytesIO())
2773 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2774 self.assertRaises(TypeError, txt.writelines, None)
2775 self.assertRaises(TypeError, txt.writelines, b'abc')
2776
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002777 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002778 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002779
2780 # read one char at a time
2781 reads = ""
2782 while True:
2783 c = txt.read(1)
2784 if not c:
2785 break
2786 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002787 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002788
2789 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002790 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002791 txt._CHUNK_SIZE = 4
2792
2793 reads = ""
2794 while True:
2795 c = txt.read(4)
2796 if not c:
2797 break
2798 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002799 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002800
2801 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002802 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002803 txt._CHUNK_SIZE = 4
2804
2805 reads = txt.read(4)
2806 reads += txt.read(4)
2807 reads += txt.readline()
2808 reads += txt.readline()
2809 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002810 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002811
2812 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002813 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002814 txt._CHUNK_SIZE = 4
2815
2816 reads = txt.read(4)
2817 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002818 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002819
2820 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002821 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002822 txt._CHUNK_SIZE = 4
2823
2824 reads = txt.read(4)
2825 pos = txt.tell()
2826 txt.seek(0)
2827 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002828 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002829
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002830 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002831 buffer = self.BytesIO(self.testdata)
2832 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002833
2834 self.assertEqual(buffer.seekable(), txt.seekable())
2835
Antoine Pitroue4501852009-05-14 18:55:55 +00002836 def test_append_bom(self):
2837 # The BOM is not written again when appending to a non-empty file
2838 filename = support.TESTFN
2839 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2840 with self.open(filename, 'w', encoding=charset) as f:
2841 f.write('aaa')
2842 pos = f.tell()
2843 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002844 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002845
2846 with self.open(filename, 'a', encoding=charset) as f:
2847 f.write('xxx')
2848 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002849 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002850
2851 def test_seek_bom(self):
2852 # Same test, but when seeking manually
2853 filename = support.TESTFN
2854 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2855 with self.open(filename, 'w', encoding=charset) as f:
2856 f.write('aaa')
2857 pos = f.tell()
2858 with self.open(filename, 'r+', encoding=charset) as f:
2859 f.seek(pos)
2860 f.write('zzz')
2861 f.seek(0)
2862 f.write('bbb')
2863 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002864 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002865
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02002866 def test_seek_append_bom(self):
2867 # Same test, but first seek to the start and then to the end
2868 filename = support.TESTFN
2869 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2870 with self.open(filename, 'w', encoding=charset) as f:
2871 f.write('aaa')
2872 with self.open(filename, 'a', encoding=charset) as f:
2873 f.seek(0)
2874 f.seek(0, self.SEEK_END)
2875 f.write('xxx')
2876 with self.open(filename, 'rb') as f:
2877 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
2878
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002879 def test_errors_property(self):
2880 with self.open(support.TESTFN, "w") as f:
2881 self.assertEqual(f.errors, "strict")
2882 with self.open(support.TESTFN, "w", errors="replace") as f:
2883 self.assertEqual(f.errors, "replace")
2884
Brett Cannon31f59292011-02-21 19:29:56 +00002885 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002886 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002887 def test_threads_write(self):
2888 # Issue6750: concurrent writes could duplicate data
2889 event = threading.Event()
2890 with self.open(support.TESTFN, "w", buffering=1) as f:
2891 def run(n):
2892 text = "Thread%03d\n" % n
2893 event.wait()
2894 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002895 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002896 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002897 with support.start_threads(threads, event.set):
2898 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002899 with self.open(support.TESTFN) as f:
2900 content = f.read()
2901 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002902 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002903
Antoine Pitrou6be88762010-05-03 16:48:20 +00002904 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002905 # Test that text file is closed despite failed flush
2906 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00002907 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002908 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00002909 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002910 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002911 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002912 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002913 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002914 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002915 self.assertTrue(txt.buffer.closed)
2916 self.assertTrue(closed) # flush() called
2917 self.assertFalse(closed[0]) # flush() called before file closed
2918 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02002919 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00002920
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03002921 def test_close_error_on_close(self):
2922 buffer = self.BytesIO(self.testdata)
2923 def bad_flush():
2924 raise OSError('flush')
2925 def bad_close():
2926 raise OSError('close')
2927 buffer.close = bad_close
2928 txt = self.TextIOWrapper(buffer, encoding="ascii")
2929 txt.flush = bad_flush
2930 with self.assertRaises(OSError) as err: # exception not swallowed
2931 txt.close()
2932 self.assertEqual(err.exception.args, ('close',))
2933 self.assertIsInstance(err.exception.__context__, OSError)
2934 self.assertEqual(err.exception.__context__.args, ('flush',))
2935 self.assertFalse(txt.closed)
2936
2937 def test_nonnormalized_close_error_on_close(self):
2938 # Issue #21677
2939 buffer = self.BytesIO(self.testdata)
2940 def bad_flush():
2941 raise non_existing_flush
2942 def bad_close():
2943 raise non_existing_close
2944 buffer.close = bad_close
2945 txt = self.TextIOWrapper(buffer, encoding="ascii")
2946 txt.flush = bad_flush
2947 with self.assertRaises(NameError) as err: # exception not swallowed
2948 txt.close()
2949 self.assertIn('non_existing_close', str(err.exception))
2950 self.assertIsInstance(err.exception.__context__, NameError)
2951 self.assertIn('non_existing_flush', str(err.exception.__context__))
2952 self.assertFalse(txt.closed)
2953
Antoine Pitrou6be88762010-05-03 16:48:20 +00002954 def test_multi_close(self):
2955 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2956 txt.close()
2957 txt.close()
2958 txt.close()
2959 self.assertRaises(ValueError, txt.flush)
2960
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002961 def test_unseekable(self):
2962 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2963 self.assertRaises(self.UnsupportedOperation, txt.tell)
2964 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2965
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002966 def test_readonly_attributes(self):
2967 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2968 buf = self.BytesIO(self.testdata)
2969 with self.assertRaises(AttributeError):
2970 txt.buffer = buf
2971
Antoine Pitroue96ec682011-07-23 21:46:35 +02002972 def test_rawio(self):
2973 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2974 # that subprocess.Popen() can have the required unbuffered
2975 # semantics with universal_newlines=True.
2976 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2977 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2978 # Reads
2979 self.assertEqual(txt.read(4), 'abcd')
2980 self.assertEqual(txt.readline(), 'efghi\n')
2981 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2982
2983 def test_rawio_write_through(self):
2984 # Issue #12591: with write_through=True, writes don't need a flush
2985 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2986 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2987 write_through=True)
2988 txt.write('1')
2989 txt.write('23\n4')
2990 txt.write('5')
2991 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2992
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02002993 def test_bufio_write_through(self):
2994 # Issue #21396: write_through=True doesn't force a flush()
2995 # on the underlying binary buffered object.
2996 flush_called, write_called = [], []
2997 class BufferedWriter(self.BufferedWriter):
2998 def flush(self, *args, **kwargs):
2999 flush_called.append(True)
3000 return super().flush(*args, **kwargs)
3001 def write(self, *args, **kwargs):
3002 write_called.append(True)
3003 return super().write(*args, **kwargs)
3004
3005 rawio = self.BytesIO()
3006 data = b"a"
3007 bufio = BufferedWriter(rawio, len(data)*2)
3008 textio = self.TextIOWrapper(bufio, encoding='ascii',
3009 write_through=True)
3010 # write to the buffered io but don't overflow the buffer
3011 text = data.decode('ascii')
3012 textio.write(text)
3013
3014 # buffer.flush is not called with write_through=True
3015 self.assertFalse(flush_called)
3016 # buffer.write *is* called with write_through=True
3017 self.assertTrue(write_called)
3018 self.assertEqual(rawio.getvalue(), b"") # no flush
3019
3020 write_called = [] # reset
3021 textio.write(text * 10) # total content is larger than bufio buffer
3022 self.assertTrue(write_called)
3023 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3024
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003025 def test_read_nonbytes(self):
3026 # Issue #17106
3027 # Crash when underlying read() returns non-bytes
3028 t = self.TextIOWrapper(self.StringIO('a'))
3029 self.assertRaises(TypeError, t.read, 1)
3030 t = self.TextIOWrapper(self.StringIO('a'))
3031 self.assertRaises(TypeError, t.readline)
3032 t = self.TextIOWrapper(self.StringIO('a'))
3033 self.assertRaises(TypeError, t.read)
3034
3035 def test_illegal_decoder(self):
3036 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003037 # Bypass the early encoding check added in issue 20404
3038 def _make_illegal_wrapper():
3039 quopri = codecs.lookup("quopri")
3040 quopri._is_text_encoding = True
3041 try:
3042 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3043 newline='\n', encoding="quopri")
3044 finally:
3045 quopri._is_text_encoding = False
3046 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003047 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003048 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003049 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003050 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003051 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003052 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003053 self.assertRaises(TypeError, t.read)
3054
Antoine Pitrou712cb732013-12-21 15:51:54 +01003055 def _check_create_at_shutdown(self, **kwargs):
3056 # Issue #20037: creating a TextIOWrapper at shutdown
3057 # shouldn't crash the interpreter.
3058 iomod = self.io.__name__
3059 code = """if 1:
3060 import codecs
3061 import {iomod} as io
3062
3063 # Avoid looking up codecs at shutdown
3064 codecs.lookup('utf-8')
3065
3066 class C:
3067 def __init__(self):
3068 self.buf = io.BytesIO()
3069 def __del__(self):
3070 io.TextIOWrapper(self.buf, **{kwargs})
3071 print("ok")
3072 c = C()
3073 """.format(iomod=iomod, kwargs=kwargs)
3074 return assert_python_ok("-c", code)
3075
3076 def test_create_at_shutdown_without_encoding(self):
3077 rc, out, err = self._check_create_at_shutdown()
3078 if err:
3079 # Can error out with a RuntimeError if the module state
3080 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003081 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003082 else:
3083 self.assertEqual("ok", out.decode().strip())
3084
3085 def test_create_at_shutdown_with_encoding(self):
3086 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3087 errors='strict')
3088 self.assertFalse(err)
3089 self.assertEqual("ok", out.decode().strip())
3090
Antoine Pitroub8503892014-04-29 10:14:02 +02003091 def test_read_byteslike(self):
3092 r = MemviewBytesIO(b'Just some random string\n')
3093 t = self.TextIOWrapper(r, 'utf-8')
3094
3095 # TextIOwrapper will not read the full string, because
3096 # we truncate it to a multiple of the native int size
3097 # so that we can construct a more complex memoryview.
3098 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3099
3100 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3101
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003102 def test_issue22849(self):
3103 class F(object):
3104 def readable(self): return True
3105 def writable(self): return True
3106 def seekable(self): return True
3107
3108 for i in range(10):
3109 try:
3110 self.TextIOWrapper(F(), encoding='utf-8')
3111 except Exception:
3112 pass
3113
3114 F.tell = lambda x: 0
3115 t = self.TextIOWrapper(F(), encoding='utf-8')
3116
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003117
Antoine Pitroub8503892014-04-29 10:14:02 +02003118class MemviewBytesIO(io.BytesIO):
3119 '''A BytesIO object whose read method returns memoryviews
3120 rather than bytes'''
3121
3122 def read1(self, len_):
3123 return _to_memoryview(super().read1(len_))
3124
3125 def read(self, len_):
3126 return _to_memoryview(super().read(len_))
3127
3128def _to_memoryview(buf):
3129 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3130
3131 arr = array.array('i')
3132 idx = len(buf) - len(buf) % arr.itemsize
3133 arr.frombytes(buf[:idx])
3134 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003135
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003136
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003137class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003138 io = io
Nick Coghlana9b15242014-02-04 22:11:18 +10003139 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003140
3141 def test_initialization(self):
3142 r = self.BytesIO(b"\xc3\xa9\n\n")
3143 b = self.BufferedReader(r, 1000)
3144 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003145 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3146 self.assertRaises(ValueError, t.read)
3147
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003148 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3149 self.assertRaises(Exception, repr, t)
3150
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003151 def test_garbage_collection(self):
3152 # C TextIOWrapper objects are collected, and collecting them flushes
3153 # all data to disk.
3154 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003155 with support.check_warnings(('', ResourceWarning)):
3156 rawio = io.FileIO(support.TESTFN, "wb")
3157 b = self.BufferedWriter(rawio)
3158 t = self.TextIOWrapper(b, encoding="ascii")
3159 t.write("456def")
3160 t.x = t
3161 wr = weakref.ref(t)
3162 del t
3163 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003164 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003165 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003166 self.assertEqual(f.read(), b"456def")
3167
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003168 def test_rwpair_cleared_before_textio(self):
3169 # Issue 13070: TextIOWrapper's finalization would crash when called
3170 # after the reference to the underlying BufferedRWPair's writer got
3171 # cleared by the GC.
3172 for i in range(1000):
3173 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3174 t1 = self.TextIOWrapper(b1, encoding="ascii")
3175 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3176 t2 = self.TextIOWrapper(b2, encoding="ascii")
3177 # circular references
3178 t1.buddy = t2
3179 t2.buddy = t1
3180 support.gc_collect()
3181
3182
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003183class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003184 io = pyio
Victor Stinner633ebda2016-03-25 08:57:16 +01003185 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003186
3187
3188class IncrementalNewlineDecoderTest(unittest.TestCase):
3189
3190 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003191 # UTF-8 specific tests for a newline decoder
3192 def _check_decode(b, s, **kwargs):
3193 # We exercise getstate() / setstate() as well as decode()
3194 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003195 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003196 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003197 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003198
Antoine Pitrou180a3362008-12-14 16:36:46 +00003199 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003200
Antoine Pitrou180a3362008-12-14 16:36:46 +00003201 _check_decode(b'\xe8', "")
3202 _check_decode(b'\xa2', "")
3203 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003204
Antoine Pitrou180a3362008-12-14 16:36:46 +00003205 _check_decode(b'\xe8', "")
3206 _check_decode(b'\xa2', "")
3207 _check_decode(b'\x88', "\u8888")
3208
3209 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003210 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3211
Antoine Pitrou180a3362008-12-14 16:36:46 +00003212 decoder.reset()
3213 _check_decode(b'\n', "\n")
3214 _check_decode(b'\r', "")
3215 _check_decode(b'', "\n", final=True)
3216 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003217
Antoine Pitrou180a3362008-12-14 16:36:46 +00003218 _check_decode(b'\r', "")
3219 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003220
Antoine Pitrou180a3362008-12-14 16:36:46 +00003221 _check_decode(b'\r\r\n', "\n\n")
3222 _check_decode(b'\r', "")
3223 _check_decode(b'\r', "\n")
3224 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003225
Antoine Pitrou180a3362008-12-14 16:36:46 +00003226 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3227 _check_decode(b'\xe8\xa2\x88', "\u8888")
3228 _check_decode(b'\n', "\n")
3229 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3230 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003232 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003233 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003234 if encoding is not None:
3235 encoder = codecs.getincrementalencoder(encoding)()
3236 def _decode_bytewise(s):
3237 # Decode one byte at a time
3238 for b in encoder.encode(s):
3239 result.append(decoder.decode(bytes([b])))
3240 else:
3241 encoder = None
3242 def _decode_bytewise(s):
3243 # Decode one char at a time
3244 for c in s:
3245 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003246 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003247 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003248 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003249 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003250 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003251 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003252 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003253 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003254 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003255 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003256 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003257 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003258 input = "abc"
3259 if encoder is not None:
3260 encoder.reset()
3261 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003262 self.assertEqual(decoder.decode(input), "abc")
3263 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003264
3265 def test_newline_decoder(self):
3266 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003267 # None meaning the IncrementalNewlineDecoder takes unicode input
3268 # rather than bytes input
3269 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003270 'utf-16', 'utf-16-le', 'utf-16-be',
3271 'utf-32', 'utf-32-le', 'utf-32-be',
3272 )
3273 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003274 decoder = enc and codecs.getincrementaldecoder(enc)()
3275 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3276 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003277 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003278 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3279 self.check_newline_decoding_utf8(decoder)
3280
Antoine Pitrou66913e22009-03-06 23:40:56 +00003281 def test_newline_bytes(self):
3282 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3283 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003284 self.assertEqual(dec.newlines, None)
3285 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3286 self.assertEqual(dec.newlines, None)
3287 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3288 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003289 dec = self.IncrementalNewlineDecoder(None, translate=False)
3290 _check(dec)
3291 dec = self.IncrementalNewlineDecoder(None, translate=True)
3292 _check(dec)
3293
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003294class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3295 pass
3296
3297class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3298 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003299
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003300
Guido van Rossum01a27522007-03-07 01:00:12 +00003301# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003302
Guido van Rossum5abbf752007-08-27 17:39:33 +00003303class MiscIOTest(unittest.TestCase):
3304
Barry Warsaw40e82462008-11-20 20:14:50 +00003305 def tearDown(self):
3306 support.unlink(support.TESTFN)
3307
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003308 def test___all__(self):
3309 for name in self.io.__all__:
3310 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003311 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003312 if name == "open":
3313 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003314 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003315 self.assertTrue(issubclass(obj, Exception), name)
3316 elif not name.startswith("SEEK_"):
3317 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003318
Barry Warsaw40e82462008-11-20 20:14:50 +00003319 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003320 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003321 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003322 f.close()
3323
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003324 with support.check_warnings(('', DeprecationWarning)):
3325 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003326 self.assertEqual(f.name, support.TESTFN)
3327 self.assertEqual(f.buffer.name, support.TESTFN)
3328 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3329 self.assertEqual(f.mode, "U")
3330 self.assertEqual(f.buffer.mode, "rb")
3331 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003332 f.close()
3333
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003334 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003335 self.assertEqual(f.mode, "w+")
3336 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3337 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003338
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003339 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003340 self.assertEqual(g.mode, "wb")
3341 self.assertEqual(g.raw.mode, "wb")
3342 self.assertEqual(g.name, f.fileno())
3343 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003344 f.close()
3345 g.close()
3346
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003347 def test_io_after_close(self):
3348 for kwargs in [
3349 {"mode": "w"},
3350 {"mode": "wb"},
3351 {"mode": "w", "buffering": 1},
3352 {"mode": "w", "buffering": 2},
3353 {"mode": "wb", "buffering": 0},
3354 {"mode": "r"},
3355 {"mode": "rb"},
3356 {"mode": "r", "buffering": 1},
3357 {"mode": "r", "buffering": 2},
3358 {"mode": "rb", "buffering": 0},
3359 {"mode": "w+"},
3360 {"mode": "w+b"},
3361 {"mode": "w+", "buffering": 1},
3362 {"mode": "w+", "buffering": 2},
3363 {"mode": "w+b", "buffering": 0},
3364 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003365 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003366 f.close()
3367 self.assertRaises(ValueError, f.flush)
3368 self.assertRaises(ValueError, f.fileno)
3369 self.assertRaises(ValueError, f.isatty)
3370 self.assertRaises(ValueError, f.__iter__)
3371 if hasattr(f, "peek"):
3372 self.assertRaises(ValueError, f.peek, 1)
3373 self.assertRaises(ValueError, f.read)
3374 if hasattr(f, "read1"):
3375 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003376 if hasattr(f, "readall"):
3377 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003378 if hasattr(f, "readinto"):
3379 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003380 if hasattr(f, "readinto1"):
3381 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003382 self.assertRaises(ValueError, f.readline)
3383 self.assertRaises(ValueError, f.readlines)
3384 self.assertRaises(ValueError, f.seek, 0)
3385 self.assertRaises(ValueError, f.tell)
3386 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003387 self.assertRaises(ValueError, f.write,
3388 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003389 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003390 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003391
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003392 def test_blockingioerror(self):
3393 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003394 class C(str):
3395 pass
3396 c = C("")
3397 b = self.BlockingIOError(1, c)
3398 c.b = b
3399 b.c = c
3400 wr = weakref.ref(c)
3401 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003402 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003403 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003404
3405 def test_abcs(self):
3406 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003407 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3408 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3409 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3410 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003411
3412 def _check_abc_inheritance(self, abcmodule):
3413 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003414 self.assertIsInstance(f, abcmodule.IOBase)
3415 self.assertIsInstance(f, abcmodule.RawIOBase)
3416 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3417 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003418 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003419 self.assertIsInstance(f, abcmodule.IOBase)
3420 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3421 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3422 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003423 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003424 self.assertIsInstance(f, abcmodule.IOBase)
3425 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3426 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3427 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003428
3429 def test_abc_inheritance(self):
3430 # Test implementations inherit from their respective ABCs
3431 self._check_abc_inheritance(self)
3432
3433 def test_abc_inheritance_official(self):
3434 # Test implementations inherit from the official ABCs of the
3435 # baseline "io" module.
3436 self._check_abc_inheritance(io)
3437
Antoine Pitroue033e062010-10-29 10:38:18 +00003438 def _check_warn_on_dealloc(self, *args, **kwargs):
3439 f = open(*args, **kwargs)
3440 r = repr(f)
3441 with self.assertWarns(ResourceWarning) as cm:
3442 f = None
3443 support.gc_collect()
3444 self.assertIn(r, str(cm.warning.args[0]))
3445
3446 def test_warn_on_dealloc(self):
3447 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3448 self._check_warn_on_dealloc(support.TESTFN, "wb")
3449 self._check_warn_on_dealloc(support.TESTFN, "w")
3450
3451 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3452 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003453 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003454 for fd in fds:
3455 try:
3456 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003457 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003458 if e.errno != errno.EBADF:
3459 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003460 self.addCleanup(cleanup_fds)
3461 r, w = os.pipe()
3462 fds += r, w
3463 self._check_warn_on_dealloc(r, *args, **kwargs)
3464 # When using closefd=False, there's no warning
3465 r, w = os.pipe()
3466 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02003467 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00003468 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00003469
3470 def test_warn_on_dealloc_fd(self):
3471 self._check_warn_on_dealloc_fd("rb", buffering=0)
3472 self._check_warn_on_dealloc_fd("rb")
3473 self._check_warn_on_dealloc_fd("r")
3474
3475
Antoine Pitrou243757e2010-11-05 21:15:39 +00003476 def test_pickling(self):
3477 # Pickling file objects is forbidden
3478 for kwargs in [
3479 {"mode": "w"},
3480 {"mode": "wb"},
3481 {"mode": "wb", "buffering": 0},
3482 {"mode": "r"},
3483 {"mode": "rb"},
3484 {"mode": "rb", "buffering": 0},
3485 {"mode": "w+"},
3486 {"mode": "w+b"},
3487 {"mode": "w+b", "buffering": 0},
3488 ]:
3489 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3490 with self.open(support.TESTFN, **kwargs) as f:
3491 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3492
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003493 def test_nonblock_pipe_write_bigbuf(self):
3494 self._test_nonblock_pipe_write(16*1024)
3495
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003496 def test_nonblock_pipe_write_smallbuf(self):
3497 self._test_nonblock_pipe_write(1024)
3498
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003499 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3500 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003501 def _test_nonblock_pipe_write(self, bufsize):
3502 sent = []
3503 received = []
3504 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003505 os.set_blocking(r, False)
3506 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003507
3508 # To exercise all code paths in the C implementation we need
3509 # to play with buffer sizes. For instance, if we choose a
3510 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3511 # then we will never get a partial write of the buffer.
3512 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3513 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3514
3515 with rf, wf:
3516 for N in 9999, 73, 7574:
3517 try:
3518 i = 0
3519 while True:
3520 msg = bytes([i % 26 + 97]) * N
3521 sent.append(msg)
3522 wf.write(msg)
3523 i += 1
3524
3525 except self.BlockingIOError as e:
3526 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003527 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003528 sent[-1] = sent[-1][:e.characters_written]
3529 received.append(rf.read())
3530 msg = b'BLOCKED'
3531 wf.write(msg)
3532 sent.append(msg)
3533
3534 while True:
3535 try:
3536 wf.flush()
3537 break
3538 except self.BlockingIOError as e:
3539 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003540 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003541 self.assertEqual(e.characters_written, 0)
3542 received.append(rf.read())
3543
3544 received += iter(rf.read, None)
3545
3546 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003547 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003548 self.assertTrue(wf.closed)
3549 self.assertTrue(rf.closed)
3550
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003551 def test_create_fail(self):
3552 # 'x' mode fails if file is existing
3553 with self.open(support.TESTFN, 'w'):
3554 pass
3555 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3556
3557 def test_create_writes(self):
3558 # 'x' mode opens for writing
3559 with self.open(support.TESTFN, 'xb') as f:
3560 f.write(b"spam")
3561 with self.open(support.TESTFN, 'rb') as f:
3562 self.assertEqual(b"spam", f.read())
3563
Christian Heimes7b648752012-09-10 14:48:43 +02003564 def test_open_allargs(self):
3565 # there used to be a buffer overflow in the parser for rawmode
3566 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3567
3568
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003569class CMiscIOTest(MiscIOTest):
3570 io = io
3571
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003572 def test_readinto_buffer_overflow(self):
3573 # Issue #18025
3574 class BadReader(self.io.BufferedIOBase):
3575 def read(self, n=-1):
3576 return b'x' * 10**6
3577 bufio = BadReader()
3578 b = bytearray(2)
3579 self.assertRaises(ValueError, bufio.readinto, b)
3580
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003581 @unittest.skipUnless(threading, 'Threading required for this test.')
3582 def check_daemon_threads_shutdown_deadlock(self, stream_name):
3583 # Issue #23309: deadlocks at shutdown should be avoided when a
3584 # daemon thread and the main thread both write to a file.
3585 code = """if 1:
3586 import sys
3587 import time
3588 import threading
3589
3590 file = sys.{stream_name}
3591
3592 def run():
3593 while True:
3594 file.write('.')
3595 file.flush()
3596
3597 thread = threading.Thread(target=run)
3598 thread.daemon = True
3599 thread.start()
3600
3601 time.sleep(0.5)
3602 file.write('!')
3603 file.flush()
3604 """.format_map(locals())
3605 res, _ = run_python_until_end("-c", code)
3606 err = res.err.decode()
3607 if res.rc != 0:
3608 # Failure: should be a fatal error
3609 self.assertIn("Fatal Python error: could not acquire lock "
3610 "for <_io.BufferedWriter name='<{stream_name}>'> "
3611 "at interpreter shutdown, possibly due to "
3612 "daemon threads".format_map(locals()),
3613 err)
3614 else:
3615 self.assertFalse(err.strip('.!'))
3616
3617 def test_daemon_threads_shutdown_stdout_deadlock(self):
3618 self.check_daemon_threads_shutdown_deadlock('stdout')
3619
3620 def test_daemon_threads_shutdown_stderr_deadlock(self):
3621 self.check_daemon_threads_shutdown_deadlock('stderr')
3622
3623
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003624class PyMiscIOTest(MiscIOTest):
3625 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003626
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003627
3628@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3629class SignalsTest(unittest.TestCase):
3630
3631 def setUp(self):
3632 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3633
3634 def tearDown(self):
3635 signal.signal(signal.SIGALRM, self.oldalrm)
3636
3637 def alarm_interrupt(self, sig, frame):
3638 1/0
3639
3640 @unittest.skipUnless(threading, 'Threading required for this test.')
3641 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3642 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003643 invokes the signal handler, and bubbles up the exception raised
3644 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003645 read_results = []
3646 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003647 if hasattr(signal, 'pthread_sigmask'):
3648 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003649 s = os.read(r, 1)
3650 read_results.append(s)
3651 t = threading.Thread(target=_read)
3652 t.daemon = True
3653 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003654 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01003655 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003656 try:
3657 wio = self.io.open(w, **fdopen_kwargs)
3658 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003659 # Fill the pipe enough that the write will be blocking.
3660 # It will be interrupted by the timer armed above. Since the
3661 # other thread has read one byte, the low-level write will
3662 # return with a successful (partial) result rather than an EINTR.
3663 # The buffered IO layer must check for pending signal
3664 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003665 signal.alarm(1)
3666 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01003667 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02003668 finally:
3669 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003670 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003671 # We got one byte, get another one and check that it isn't a
3672 # repeat of the first one.
3673 read_results.append(os.read(r, 1))
3674 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3675 finally:
3676 os.close(w)
3677 os.close(r)
3678 # This is deliberate. If we didn't close the file descriptor
3679 # before closing wio, wio would try to flush its internal
3680 # buffer, and block again.
3681 try:
3682 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003683 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003684 if e.errno != errno.EBADF:
3685 raise
3686
3687 def test_interrupted_write_unbuffered(self):
3688 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3689
3690 def test_interrupted_write_buffered(self):
3691 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3692
Victor Stinner6ab72862014-09-03 23:32:28 +02003693 # Issue #22331: The test hangs on FreeBSD 7.2
3694 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003695 def test_interrupted_write_text(self):
3696 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3697
Brett Cannon31f59292011-02-21 19:29:56 +00003698 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003699 def check_reentrant_write(self, data, **fdopen_kwargs):
3700 def on_alarm(*args):
3701 # Will be called reentrantly from the same thread
3702 wio.write(data)
3703 1/0
3704 signal.signal(signal.SIGALRM, on_alarm)
3705 r, w = os.pipe()
3706 wio = self.io.open(w, **fdopen_kwargs)
3707 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003708 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003709 # Either the reentrant call to wio.write() fails with RuntimeError,
3710 # or the signal handler raises ZeroDivisionError.
3711 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3712 while 1:
3713 for i in range(100):
3714 wio.write(data)
3715 wio.flush()
3716 # Make sure the buffer doesn't fill up and block further writes
3717 os.read(r, len(data) * 100)
3718 exc = cm.exception
3719 if isinstance(exc, RuntimeError):
3720 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3721 finally:
3722 wio.close()
3723 os.close(r)
3724
3725 def test_reentrant_write_buffered(self):
3726 self.check_reentrant_write(b"xy", mode="wb")
3727
3728 def test_reentrant_write_text(self):
3729 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3730
Antoine Pitrou707ce822011-02-25 21:24:11 +00003731 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3732 """Check that a buffered read, when it gets interrupted (either
3733 returning a partial result or EINTR), properly invokes the signal
3734 handler and retries if the latter returned successfully."""
3735 r, w = os.pipe()
3736 fdopen_kwargs["closefd"] = False
3737 def alarm_handler(sig, frame):
3738 os.write(w, b"bar")
3739 signal.signal(signal.SIGALRM, alarm_handler)
3740 try:
3741 rio = self.io.open(r, **fdopen_kwargs)
3742 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003743 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003744 # Expected behaviour:
3745 # - first raw read() returns partial b"foo"
3746 # - second raw read() returns EINTR
3747 # - third raw read() returns b"bar"
3748 self.assertEqual(decode(rio.read(6)), "foobar")
3749 finally:
3750 rio.close()
3751 os.close(w)
3752 os.close(r)
3753
Antoine Pitrou20db5112011-08-19 20:32:34 +02003754 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003755 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3756 mode="rb")
3757
Antoine Pitrou20db5112011-08-19 20:32:34 +02003758 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003759 self.check_interrupted_read_retry(lambda x: x,
3760 mode="r")
3761
3762 @unittest.skipUnless(threading, 'Threading required for this test.')
3763 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3764 """Check that a buffered write, when it gets interrupted (either
3765 returning a partial result or EINTR), properly invokes the signal
3766 handler and retries if the latter returned successfully."""
3767 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003768
Antoine Pitrou707ce822011-02-25 21:24:11 +00003769 # A quantity that exceeds the buffer size of an anonymous pipe's
3770 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003771 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003772 r, w = os.pipe()
3773 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003774
Antoine Pitrou707ce822011-02-25 21:24:11 +00003775 # We need a separate thread to read from the pipe and allow the
3776 # write() to finish. This thread is started after the SIGALRM is
3777 # received (forcing a first EINTR in write()).
3778 read_results = []
3779 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003780 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00003781 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003782 try:
3783 while not write_finished:
3784 while r in select.select([r], [], [], 1.0)[0]:
3785 s = os.read(r, 1024)
3786 read_results.append(s)
3787 except BaseException as exc:
3788 nonlocal error
3789 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00003790 t = threading.Thread(target=_read)
3791 t.daemon = True
3792 def alarm1(sig, frame):
3793 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003794 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003795 def alarm2(sig, frame):
3796 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003797
3798 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00003799 signal.signal(signal.SIGALRM, alarm1)
3800 try:
3801 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003802 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003803 # Expected behaviour:
3804 # - first raw write() is partial (because of the limited pipe buffer
3805 # and the first alarm)
3806 # - second raw write() returns EINTR (because of the second alarm)
3807 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003808 written = wio.write(large_data)
3809 self.assertEqual(N, written)
3810
Antoine Pitrou707ce822011-02-25 21:24:11 +00003811 wio.flush()
3812 write_finished = True
3813 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003814
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003815 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003816 self.assertEqual(N, sum(len(x) for x in read_results))
3817 finally:
3818 write_finished = True
3819 os.close(w)
3820 os.close(r)
3821 # This is deliberate. If we didn't close the file descriptor
3822 # before closing wio, wio would try to flush its internal
3823 # buffer, and could block (in case of failure).
3824 try:
3825 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003826 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003827 if e.errno != errno.EBADF:
3828 raise
3829
Antoine Pitrou20db5112011-08-19 20:32:34 +02003830 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003831 self.check_interrupted_write_retry(b"x", mode="wb")
3832
Antoine Pitrou20db5112011-08-19 20:32:34 +02003833 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003834 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3835
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003836
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003837class CSignalsTest(SignalsTest):
3838 io = io
3839
3840class PySignalsTest(SignalsTest):
3841 io = pyio
3842
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003843 # Handling reentrancy issues would slow down _pyio even more, so the
3844 # tests are disabled.
3845 test_reentrant_write_buffered = None
3846 test_reentrant_write_text = None
3847
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003848
Ezio Melottidaa42c72013-03-23 16:30:16 +02003849def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07003850 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003851 CBufferedReaderTest, PyBufferedReaderTest,
3852 CBufferedWriterTest, PyBufferedWriterTest,
3853 CBufferedRWPairTest, PyBufferedRWPairTest,
3854 CBufferedRandomTest, PyBufferedRandomTest,
3855 StatefulIncrementalDecoderTest,
3856 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3857 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003858 CMiscIOTest, PyMiscIOTest,
3859 CSignalsTest, PySignalsTest,
3860 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003861
3862 # Put the namespaces of the IO module we are testing and some useful mock
3863 # classes in the __dict__ of each test.
3864 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003865 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003866 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3867 c_io_ns = {name : getattr(io, name) for name in all_members}
3868 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3869 globs = globals()
3870 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3871 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3872 # Avoid turning open into a bound method.
3873 py_io_ns["open"] = pyio.OpenWrapper
3874 for test in tests:
3875 if test.__name__.startswith("C"):
3876 for name, obj in c_io_ns.items():
3877 setattr(test, name, obj)
3878 elif test.__name__.startswith("Py"):
3879 for name, obj in py_io_ns.items():
3880 setattr(test, name, obj)
3881
Ezio Melottidaa42c72013-03-23 16:30:16 +02003882 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3883 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003884
3885if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003886 unittest.main()