blob: a7bb95b243d4f68388e4cc510f1387a370354355 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Berker Peksagce643912015-05-06 06:33:17 +030038from test.support.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000048def _default_chunk_size():
49 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000050 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000051 return f._CHUNK_SIZE
52
53
Antoine Pitrou328ec742010-09-14 18:37:24 +000054class MockRawIOWithoutRead:
55 """A RawIO implementation without read(), so as to exercise the default
56 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000057
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000058 def __init__(self, read_stack=()):
59 self._read_stack = list(read_stack)
60 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000061 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000062 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000063
Guido van Rossum01a27522007-03-07 01:00:12 +000064 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000066 return len(b)
67
68 def writable(self):
69 return True
70
Guido van Rossum68bbcd22007-02-27 17:19:33 +000071 def fileno(self):
72 return 42
73
74 def readable(self):
75 return True
76
Guido van Rossum01a27522007-03-07 01:00:12 +000077 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000078 return True
79
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000082
83 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return 0 # same comment as above
85
86 def readinto(self, buf):
87 self._reads += 1
88 max_len = len(buf)
89 try:
90 data = self._read_stack[0]
91 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000092 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000093 return 0
94 if data is None:
95 del self._read_stack[0]
96 return None
97 n = len(data)
98 if len(data) <= max_len:
99 del self._read_stack[0]
100 buf[:n] = data
101 return n
102 else:
103 buf[:] = data[:max_len]
104 self._read_stack[0] = data[max_len:]
105 return max_len
106
107 def truncate(self, pos=None):
108 return pos
109
Antoine Pitrou328ec742010-09-14 18:37:24 +0000110class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
111 pass
112
113class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
114 pass
115
116
117class MockRawIO(MockRawIOWithoutRead):
118
119 def read(self, n=None):
120 self._reads += 1
121 try:
122 return self._read_stack.pop(0)
123 except:
124 self._extraneous_reads += 1
125 return b""
126
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000127class CMockRawIO(MockRawIO, io.RawIOBase):
128 pass
129
130class PyMockRawIO(MockRawIO, pyio.RawIOBase):
131 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000132
Guido van Rossuma9e20242007-03-08 00:43:48 +0000133
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000134class MisbehavedRawIO(MockRawIO):
135 def write(self, b):
136 return super().write(b) * 2
137
138 def read(self, n=None):
139 return super().read(n) * 2
140
141 def seek(self, pos, whence):
142 return -123
143
144 def tell(self):
145 return -456
146
147 def readinto(self, buf):
148 super().readinto(buf)
149 return len(buf) * 5
150
151class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
152 pass
153
154class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
155 pass
156
157
158class CloseFailureIO(MockRawIO):
159 closed = 0
160
161 def close(self):
162 if not self.closed:
163 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200164 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000165
166class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
167 pass
168
169class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
170 pass
171
172
173class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000174
175 def __init__(self, data):
176 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000177 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000181 self.read_history.append(None if res is None else len(res))
182 return res
183
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 def readinto(self, b):
185 res = super().readinto(b)
186 self.read_history.append(res)
187 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000189class CMockFileIO(MockFileIO, io.BytesIO):
190 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000192class PyMockFileIO(MockFileIO, pyio.BytesIO):
193 pass
194
195
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000196class MockUnseekableIO:
197 def seekable(self):
198 return False
199
200 def seek(self, *args):
201 raise self.UnsupportedOperation("not seekable")
202
203 def tell(self, *args):
204 raise self.UnsupportedOperation("not seekable")
205
Martin Panter754aab22016-03-31 07:21:56 +0000206 def truncate(self, *args):
207 raise self.UnsupportedOperation("not seekable")
208
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000209class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
210 UnsupportedOperation = io.UnsupportedOperation
211
212class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
213 UnsupportedOperation = pyio.UnsupportedOperation
214
215
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216class MockNonBlockWriterIO:
217
218 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000219 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000220 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000222 def pop_written(self):
223 s = b"".join(self._write_stack)
224 self._write_stack[:] = []
225 return s
226
227 def block_on(self, char):
228 """Block when a given char is encountered."""
229 self._blocker_char = char
230
231 def readable(self):
232 return True
233
234 def seekable(self):
235 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000236
Guido van Rossum01a27522007-03-07 01:00:12 +0000237 def writable(self):
238 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000239
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000240 def write(self, b):
241 b = bytes(b)
242 n = -1
243 if self._blocker_char:
244 try:
245 n = b.index(self._blocker_char)
246 except ValueError:
247 pass
248 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100249 if n > 0:
250 # write data up to the first blocker
251 self._write_stack.append(b[:n])
252 return n
253 else:
254 # cancel blocker and indicate would block
255 self._blocker_char = None
256 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000257 self._write_stack.append(b)
258 return len(b)
259
260class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
261 BlockingIOError = io.BlockingIOError
262
263class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
264 BlockingIOError = pyio.BlockingIOError
265
Guido van Rossuma9e20242007-03-08 00:43:48 +0000266
Guido van Rossum28524c72007-02-27 05:47:44 +0000267class IOTest(unittest.TestCase):
268
Neal Norwitze7789b12008-03-24 06:18:09 +0000269 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000270 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000271
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000272 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000273 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000274
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000276 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000277 f.truncate(0)
278 self.assertEqual(f.tell(), 5)
279 f.seek(0)
280
281 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000282 self.assertEqual(f.seek(0), 0)
283 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000284 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000285 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000286 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000287 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000288 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000289 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000290 self.assertEqual(f.seek(-1, 2), 13)
291 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000292
Guido van Rossum87429772007-04-10 21:06:59 +0000293 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000294 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000295 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000296
Guido van Rossum9b76da62007-04-11 01:09:03 +0000297 def read_ops(self, f, buffered=False):
298 data = f.read(5)
299 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000300 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000301 self.assertEqual(f.readinto(data), 5)
302 self.assertEqual(data, b" worl")
303 self.assertEqual(f.readinto(data), 2)
304 self.assertEqual(len(data), 5)
305 self.assertEqual(data[:2], b"d\n")
306 self.assertEqual(f.seek(0), 0)
307 self.assertEqual(f.read(20), b"hello world\n")
308 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000309 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000310 self.assertEqual(f.seek(-6, 2), 6)
311 self.assertEqual(f.read(5), b"world")
312 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000313 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000314 self.assertEqual(f.seek(-6, 1), 5)
315 self.assertEqual(f.read(5), b" worl")
316 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000317 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000318 if buffered:
319 f.seek(0)
320 self.assertEqual(f.read(), b"hello world\n")
321 f.seek(6)
322 self.assertEqual(f.read(), b"world\n")
323 self.assertEqual(f.read(), b"")
324
Guido van Rossum34d69e52007-04-10 20:08:41 +0000325 LARGE = 2**31
326
Guido van Rossum53807da2007-04-10 19:01:47 +0000327 def large_file_ops(self, f):
328 assert f.readable()
329 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000330 self.assertEqual(f.seek(self.LARGE), self.LARGE)
331 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000332 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000333 self.assertEqual(f.tell(), self.LARGE + 3)
334 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000336 self.assertEqual(f.tell(), self.LARGE + 2)
337 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000338 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000339 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000340 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
341 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000342 self.assertEqual(f.read(2), b"x")
343
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000344 def test_invalid_operations(self):
345 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000346 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000347 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000348 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000349 self.assertRaises(exc, fp.read)
350 self.assertRaises(exc, fp.readline)
351 with self.open(support.TESTFN, "wb", buffering=0) as fp:
352 self.assertRaises(exc, fp.read)
353 self.assertRaises(exc, fp.readline)
354 with self.open(support.TESTFN, "rb", buffering=0) as fp:
355 self.assertRaises(exc, fp.write, b"blah")
356 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000357 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000358 self.assertRaises(exc, fp.write, b"blah")
359 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000360 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000361 self.assertRaises(exc, fp.write, "blah")
362 self.assertRaises(exc, fp.writelines, ["blah\n"])
363 # Non-zero seeking from current or end pos
364 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
365 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000366
Martin Panter754aab22016-03-31 07:21:56 +0000367 def test_optional_abilities(self):
368 # Test for OSError when optional APIs are not supported
369 # The purpose of this test is to try fileno(), reading, writing and
370 # seeking operations with various objects that indicate they do not
371 # support these operations.
372
373 def pipe_reader():
374 [r, w] = os.pipe()
375 os.close(w) # So that read() is harmless
376 return self.FileIO(r, "r")
377
378 def pipe_writer():
379 [r, w] = os.pipe()
380 self.addCleanup(os.close, r)
381 # Guarantee that we can write into the pipe without blocking
382 thread = threading.Thread(target=os.read, args=(r, 100))
383 thread.start()
384 self.addCleanup(thread.join)
385 return self.FileIO(w, "w")
386
387 def buffered_reader():
388 return self.BufferedReader(self.MockUnseekableIO())
389
390 def buffered_writer():
391 return self.BufferedWriter(self.MockUnseekableIO())
392
393 def buffered_random():
394 return self.BufferedRandom(self.BytesIO())
395
396 def buffered_rw_pair():
397 return self.BufferedRWPair(self.MockUnseekableIO(),
398 self.MockUnseekableIO())
399
400 def text_reader():
401 class UnseekableReader(self.MockUnseekableIO):
402 writable = self.BufferedIOBase.writable
403 write = self.BufferedIOBase.write
404 return self.TextIOWrapper(UnseekableReader(), "ascii")
405
406 def text_writer():
407 class UnseekableWriter(self.MockUnseekableIO):
408 readable = self.BufferedIOBase.readable
409 read = self.BufferedIOBase.read
410 return self.TextIOWrapper(UnseekableWriter(), "ascii")
411
412 tests = (
413 (pipe_reader, "fr"), (pipe_writer, "fw"),
414 (buffered_reader, "r"), (buffered_writer, "w"),
415 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
416 (text_reader, "r"), (text_writer, "w"),
417 (self.BytesIO, "rws"), (self.StringIO, "rws"),
418 )
419 for [test, abilities] in tests:
420 if test is pipe_writer and not threading:
421 continue # Skip subtest that uses a background thread
422 with self.subTest(test), test() as obj:
423 readable = "r" in abilities
424 self.assertEqual(obj.readable(), readable)
425 writable = "w" in abilities
426 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000427
428 if isinstance(obj, self.TextIOBase):
429 data = "3"
430 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
431 data = b"3"
432 else:
433 self.fail("Unknown base class")
434
435 if "f" in abilities:
436 obj.fileno()
437 else:
438 self.assertRaises(OSError, obj.fileno)
439
440 if readable:
441 obj.read(1)
442 obj.read()
443 else:
444 self.assertRaises(OSError, obj.read, 1)
445 self.assertRaises(OSError, obj.read)
446
447 if writable:
448 obj.write(data)
449 else:
450 self.assertRaises(OSError, obj.write, data)
451
Martin Panter3ee147f2016-03-31 21:05:31 +0000452 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000453 pipe_reader, pipe_writer):
454 # Pipes seem to appear as seekable on Windows
455 continue
456 seekable = "s" in abilities
457 self.assertEqual(obj.seekable(), seekable)
458
Martin Panter754aab22016-03-31 07:21:56 +0000459 if seekable:
460 obj.tell()
461 obj.seek(0)
462 else:
463 self.assertRaises(OSError, obj.tell)
464 self.assertRaises(OSError, obj.seek, 0)
465
466 if writable and seekable:
467 obj.truncate()
468 obj.truncate(0)
469 else:
470 self.assertRaises(OSError, obj.truncate)
471 self.assertRaises(OSError, obj.truncate, 0)
472
Antoine Pitrou13348842012-01-29 18:36:34 +0100473 def test_open_handles_NUL_chars(self):
474 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300475 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
476 self.assertRaises(ValueError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100477
Guido van Rossum28524c72007-02-27 05:47:44 +0000478 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000479 with self.open(support.TESTFN, "wb", buffering=0) as f:
480 self.assertEqual(f.readable(), False)
481 self.assertEqual(f.writable(), True)
482 self.assertEqual(f.seekable(), True)
483 self.write_ops(f)
484 with self.open(support.TESTFN, "rb", buffering=0) as f:
485 self.assertEqual(f.readable(), True)
486 self.assertEqual(f.writable(), False)
487 self.assertEqual(f.seekable(), True)
488 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000489
Guido van Rossum87429772007-04-10 21:06:59 +0000490 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000491 with self.open(support.TESTFN, "wb") as f:
492 self.assertEqual(f.readable(), False)
493 self.assertEqual(f.writable(), True)
494 self.assertEqual(f.seekable(), True)
495 self.write_ops(f)
496 with self.open(support.TESTFN, "rb") as f:
497 self.assertEqual(f.readable(), True)
498 self.assertEqual(f.writable(), False)
499 self.assertEqual(f.seekable(), True)
500 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000501
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000502 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000503 with self.open(support.TESTFN, "wb") as f:
504 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
505 with self.open(support.TESTFN, "rb") as f:
506 self.assertEqual(f.readline(), b"abc\n")
507 self.assertEqual(f.readline(10), b"def\n")
508 self.assertEqual(f.readline(2), b"xy")
509 self.assertEqual(f.readline(4), b"zzy\n")
510 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000511 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000512 self.assertRaises(TypeError, f.readline, 5.3)
513 with self.open(support.TESTFN, "r") as f:
514 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000515
Guido van Rossum28524c72007-02-27 05:47:44 +0000516 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000517 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000518 self.write_ops(f)
519 data = f.getvalue()
520 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000521 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000522 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000523
Guido van Rossum53807da2007-04-10 19:01:47 +0000524 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000525 # On Windows and Mac OSX this test comsumes large resources; It takes
526 # a long time to build the >2GB file and takes >2GB of disk space
527 # therefore the resource must be enabled to run this test.
528 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600529 support.requires(
530 'largefile',
531 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000532 with self.open(support.TESTFN, "w+b", 0) as f:
533 self.large_file_ops(f)
534 with self.open(support.TESTFN, "w+b") as f:
535 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000536
537 def test_with_open(self):
538 for bufsize in (0, 1, 100):
539 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000540 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000541 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000542 self.assertEqual(f.closed, True)
543 f = None
544 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000545 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000546 1/0
547 except ZeroDivisionError:
548 self.assertEqual(f.closed, True)
549 else:
550 self.fail("1/0 didn't raise an exception")
551
Antoine Pitrou08838b62009-01-21 00:55:13 +0000552 # issue 5008
553 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000554 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000555 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000557 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000558 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000559 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000560 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300561 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000562
Guido van Rossum87429772007-04-10 21:06:59 +0000563 def test_destructor(self):
564 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000565 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000566 def __del__(self):
567 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000568 try:
569 f = super().__del__
570 except AttributeError:
571 pass
572 else:
573 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000574 def close(self):
575 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000576 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000577 def flush(self):
578 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000579 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000580 with support.check_warnings(('', ResourceWarning)):
581 f = MyFileIO(support.TESTFN, "wb")
582 f.write(b"xxx")
583 del f
584 support.gc_collect()
585 self.assertEqual(record, [1, 2, 3])
586 with self.open(support.TESTFN, "rb") as f:
587 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000588
589 def _check_base_destructor(self, base):
590 record = []
591 class MyIO(base):
592 def __init__(self):
593 # This exercises the availability of attributes on object
594 # destruction.
595 # (in the C version, close() is called by the tp_dealloc
596 # function, not by __del__)
597 self.on_del = 1
598 self.on_close = 2
599 self.on_flush = 3
600 def __del__(self):
601 record.append(self.on_del)
602 try:
603 f = super().__del__
604 except AttributeError:
605 pass
606 else:
607 f()
608 def close(self):
609 record.append(self.on_close)
610 super().close()
611 def flush(self):
612 record.append(self.on_flush)
613 super().flush()
614 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000615 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000616 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000617 self.assertEqual(record, [1, 2, 3])
618
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000619 def test_IOBase_destructor(self):
620 self._check_base_destructor(self.IOBase)
621
622 def test_RawIOBase_destructor(self):
623 self._check_base_destructor(self.RawIOBase)
624
625 def test_BufferedIOBase_destructor(self):
626 self._check_base_destructor(self.BufferedIOBase)
627
628 def test_TextIOBase_destructor(self):
629 self._check_base_destructor(self.TextIOBase)
630
Guido van Rossum87429772007-04-10 21:06:59 +0000631 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000632 with self.open(support.TESTFN, "wb") as f:
633 f.write(b"xxx")
634 with self.open(support.TESTFN, "rb") as f:
635 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000636
Guido van Rossumd4103952007-04-12 05:44:49 +0000637 def test_array_writes(self):
638 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000639 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000640 with self.open(support.TESTFN, "wb", 0) as f:
641 self.assertEqual(f.write(a), n)
642 with self.open(support.TESTFN, "wb") as f:
643 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000644
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000645 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000646 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000647 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000648
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000649 def test_read_closed(self):
650 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000651 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000652 with self.open(support.TESTFN, "r") as f:
653 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000654 self.assertEqual(file.read(), "egg\n")
655 file.seek(0)
656 file.close()
657 self.assertRaises(ValueError, file.read)
658
659 def test_no_closefd_with_filename(self):
660 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000661 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000662
663 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000664 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000665 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000666 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000667 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000668 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000669 self.assertEqual(file.buffer.raw.closefd, False)
670
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000671 def test_garbage_collection(self):
672 # FileIO objects are collected, and collecting them flushes
673 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000674 with support.check_warnings(('', ResourceWarning)):
675 f = self.FileIO(support.TESTFN, "wb")
676 f.write(b"abcxxx")
677 f.f = f
678 wr = weakref.ref(f)
679 del f
680 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300681 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000682 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000683 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000684
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000685 def test_unbounded_file(self):
686 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
687 zero = "/dev/zero"
688 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000689 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000690 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000691 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000692 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000693 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000694 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000695 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000696 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000697 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000698 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000699 self.assertRaises(OverflowError, f.read)
700
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200701 def check_flush_error_on_close(self, *args, **kwargs):
702 # Test that the file is closed despite failed flush
703 # and that flush() is called before file closed.
704 f = self.open(*args, **kwargs)
705 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000706 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200707 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200708 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000709 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200710 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600711 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200712 self.assertTrue(closed) # flush() called
713 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200714 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200715
716 def test_flush_error_on_close(self):
717 # raw file
718 # Issue #5700: io.FileIO calls flush() after file closed
719 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
720 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
721 self.check_flush_error_on_close(fd, 'wb', buffering=0)
722 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
723 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
724 os.close(fd)
725 # buffered io
726 self.check_flush_error_on_close(support.TESTFN, 'wb')
727 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
728 self.check_flush_error_on_close(fd, 'wb')
729 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
730 self.check_flush_error_on_close(fd, 'wb', closefd=False)
731 os.close(fd)
732 # text io
733 self.check_flush_error_on_close(support.TESTFN, 'w')
734 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
735 self.check_flush_error_on_close(fd, 'w')
736 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
737 self.check_flush_error_on_close(fd, 'w', closefd=False)
738 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000739
740 def test_multi_close(self):
741 f = self.open(support.TESTFN, "wb", buffering=0)
742 f.close()
743 f.close()
744 f.close()
745 self.assertRaises(ValueError, f.flush)
746
Antoine Pitrou328ec742010-09-14 18:37:24 +0000747 def test_RawIOBase_read(self):
748 # Exercise the default RawIOBase.read() implementation (which calls
749 # readinto() internally).
750 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
751 self.assertEqual(rawio.read(2), b"ab")
752 self.assertEqual(rawio.read(2), b"c")
753 self.assertEqual(rawio.read(2), b"d")
754 self.assertEqual(rawio.read(2), None)
755 self.assertEqual(rawio.read(2), b"ef")
756 self.assertEqual(rawio.read(2), b"g")
757 self.assertEqual(rawio.read(2), None)
758 self.assertEqual(rawio.read(2), b"")
759
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400760 def test_types_have_dict(self):
761 test = (
762 self.IOBase(),
763 self.RawIOBase(),
764 self.TextIOBase(),
765 self.StringIO(),
766 self.BytesIO()
767 )
768 for obj in test:
769 self.assertTrue(hasattr(obj, "__dict__"))
770
Ross Lagerwall59142db2011-10-31 20:34:46 +0200771 def test_opener(self):
772 with self.open(support.TESTFN, "w") as f:
773 f.write("egg\n")
774 fd = os.open(support.TESTFN, os.O_RDONLY)
775 def opener(path, flags):
776 return fd
777 with self.open("non-existent", "r", opener=opener) as f:
778 self.assertEqual(f.read(), "egg\n")
779
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200780 def test_fileio_closefd(self):
781 # Issue #4841
782 with self.open(__file__, 'rb') as f1, \
783 self.open(__file__, 'rb') as f2:
784 fileio = self.FileIO(f1.fileno(), closefd=False)
785 # .__init__() must not close f1
786 fileio.__init__(f2.fileno(), closefd=False)
787 f1.readline()
788 # .close() must not close f2
789 fileio.close()
790 f2.readline()
791
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300792 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200793 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300794 with self.assertRaises(ValueError):
795 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300796
797 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200798 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300799 with self.assertRaises(ValueError):
800 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300801
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200802
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000803class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200804
805 def test_IOBase_finalize(self):
806 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
807 # class which inherits IOBase and an object of this class are caught
808 # in a reference cycle and close() is already in the method cache.
809 class MyIO(self.IOBase):
810 def close(self):
811 pass
812
813 # create an instance to populate the method cache
814 MyIO()
815 obj = MyIO()
816 obj.obj = obj
817 wr = weakref.ref(obj)
818 del MyIO
819 del obj
820 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300821 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000822
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000823class PyIOTest(IOTest):
824 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000825
Guido van Rossuma9e20242007-03-08 00:43:48 +0000826
Gregory P. Smith1bef9072015-04-14 13:24:34 -0700827@support.cpython_only
828class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -0700829
Gregory P. Smith054b0652015-04-14 12:58:05 -0700830 def test_RawIOBase_io_in_pyio_match(self):
831 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +0200832 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
833 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -0700834 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
835
836 def test_RawIOBase_pyio_in_io_match(self):
837 """Test that c RawIOBase class has all pyio RawIOBase methods"""
838 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
839 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
840
841
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000842class CommonBufferedTests:
843 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
844
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000845 def test_detach(self):
846 raw = self.MockRawIO()
847 buf = self.tp(raw)
848 self.assertIs(buf.detach(), raw)
849 self.assertRaises(ValueError, buf.detach)
850
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600851 repr(buf) # Should still work
852
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000853 def test_fileno(self):
854 rawio = self.MockRawIO()
855 bufio = self.tp(rawio)
856
Ezio Melottib3aedd42010-11-20 19:04:17 +0000857 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000858
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000859 def test_invalid_args(self):
860 rawio = self.MockRawIO()
861 bufio = self.tp(rawio)
862 # Invalid whence
863 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200864 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000865
866 def test_override_destructor(self):
867 tp = self.tp
868 record = []
869 class MyBufferedIO(tp):
870 def __del__(self):
871 record.append(1)
872 try:
873 f = super().__del__
874 except AttributeError:
875 pass
876 else:
877 f()
878 def close(self):
879 record.append(2)
880 super().close()
881 def flush(self):
882 record.append(3)
883 super().flush()
884 rawio = self.MockRawIO()
885 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000886 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000887 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +0000888 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000889
890 def test_context_manager(self):
891 # Test usability as a context manager
892 rawio = self.MockRawIO()
893 bufio = self.tp(rawio)
894 def _with():
895 with bufio:
896 pass
897 _with()
898 # bufio should now be closed, and using it a second time should raise
899 # a ValueError.
900 self.assertRaises(ValueError, _with)
901
902 def test_error_through_destructor(self):
903 # Test that the exception state is not modified by a destructor,
904 # even if close() fails.
905 rawio = self.CloseFailureIO()
906 def f():
907 self.tp(rawio).xyzzy
908 with support.captured_output("stderr") as s:
909 self.assertRaises(AttributeError, f)
910 s = s.getvalue().strip()
911 if s:
912 # The destructor *may* have printed an unraisable error, check it
913 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200914 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000915 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000916
Antoine Pitrou716c4442009-05-23 19:04:03 +0000917 def test_repr(self):
918 raw = self.MockRawIO()
919 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +0300920 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +0000921 self.assertEqual(repr(b), "<%s>" % clsname)
922 raw.name = "dummy"
923 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
924 raw.name = b"dummy"
925 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
926
Antoine Pitrou6be88762010-05-03 16:48:20 +0000927 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200928 # Test that buffered file is closed despite failed flush
929 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +0000930 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200931 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000932 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200933 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200934 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000935 raw.flush = bad_flush
936 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200937 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600938 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200939 self.assertTrue(raw.closed)
940 self.assertTrue(closed) # flush() called
941 self.assertFalse(closed[0]) # flush() called before file closed
942 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200943 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -0600944
945 def test_close_error_on_close(self):
946 raw = self.MockRawIO()
947 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200948 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600949 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200950 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600951 raw.close = bad_close
952 b = self.tp(raw)
953 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200954 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600955 b.close()
956 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300957 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -0600958 self.assertEqual(err.exception.__context__.args, ('flush',))
959 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000960
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300961 def test_nonnormalized_close_error_on_close(self):
962 # Issue #21677
963 raw = self.MockRawIO()
964 def bad_flush():
965 raise non_existing_flush
966 def bad_close():
967 raise non_existing_close
968 raw.close = bad_close
969 b = self.tp(raw)
970 b.flush = bad_flush
971 with self.assertRaises(NameError) as err: # exception not swallowed
972 b.close()
973 self.assertIn('non_existing_close', str(err.exception))
974 self.assertIsInstance(err.exception.__context__, NameError)
975 self.assertIn('non_existing_flush', str(err.exception.__context__))
976 self.assertFalse(b.closed)
977
Antoine Pitrou6be88762010-05-03 16:48:20 +0000978 def test_multi_close(self):
979 raw = self.MockRawIO()
980 b = self.tp(raw)
981 b.close()
982 b.close()
983 b.close()
984 self.assertRaises(ValueError, b.flush)
985
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000986 def test_unseekable(self):
987 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
988 self.assertRaises(self.UnsupportedOperation, bufio.tell)
989 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
990
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000991 def test_readonly_attributes(self):
992 raw = self.MockRawIO()
993 buf = self.tp(raw)
994 x = self.MockRawIO()
995 with self.assertRaises(AttributeError):
996 buf.raw = x
997
Guido van Rossum78892e42007-04-06 17:31:18 +0000998
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200999class SizeofTest:
1000
1001 @support.cpython_only
1002 def test_sizeof(self):
1003 bufsize1 = 4096
1004 bufsize2 = 8192
1005 rawio = self.MockRawIO()
1006 bufio = self.tp(rawio, buffer_size=bufsize1)
1007 size = sys.getsizeof(bufio) - bufsize1
1008 rawio = self.MockRawIO()
1009 bufio = self.tp(rawio, buffer_size=bufsize2)
1010 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1011
Jesus Ceadc469452012-10-04 12:37:56 +02001012 @support.cpython_only
1013 def test_buffer_freeing(self) :
1014 bufsize = 4096
1015 rawio = self.MockRawIO()
1016 bufio = self.tp(rawio, buffer_size=bufsize)
1017 size = sys.getsizeof(bufio) - bufsize
1018 bufio.close()
1019 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001020
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001021class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1022 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001023
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001024 def test_constructor(self):
1025 rawio = self.MockRawIO([b"abc"])
1026 bufio = self.tp(rawio)
1027 bufio.__init__(rawio)
1028 bufio.__init__(rawio, buffer_size=1024)
1029 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001030 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001031 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1032 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1033 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1034 rawio = self.MockRawIO([b"abc"])
1035 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001036 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001037
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001038 def test_uninitialized(self):
1039 bufio = self.tp.__new__(self.tp)
1040 del bufio
1041 bufio = self.tp.__new__(self.tp)
1042 self.assertRaisesRegex((ValueError, AttributeError),
1043 'uninitialized|has no attribute',
1044 bufio.read, 0)
1045 bufio.__init__(self.MockRawIO())
1046 self.assertEqual(bufio.read(0), b'')
1047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001048 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001049 for arg in (None, 7):
1050 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1051 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001052 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001053 # Invalid args
1054 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001055
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001056 def test_read1(self):
1057 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1058 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001059 self.assertEqual(b"a", bufio.read(1))
1060 self.assertEqual(b"b", bufio.read1(1))
1061 self.assertEqual(rawio._reads, 1)
1062 self.assertEqual(b"c", bufio.read1(100))
1063 self.assertEqual(rawio._reads, 1)
1064 self.assertEqual(b"d", bufio.read1(100))
1065 self.assertEqual(rawio._reads, 2)
1066 self.assertEqual(b"efg", bufio.read1(100))
1067 self.assertEqual(rawio._reads, 3)
1068 self.assertEqual(b"", bufio.read1(100))
1069 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001070 # Invalid args
1071 self.assertRaises(ValueError, bufio.read1, -1)
1072
1073 def test_readinto(self):
1074 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1075 bufio = self.tp(rawio)
1076 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001077 self.assertEqual(bufio.readinto(b), 2)
1078 self.assertEqual(b, b"ab")
1079 self.assertEqual(bufio.readinto(b), 2)
1080 self.assertEqual(b, b"cd")
1081 self.assertEqual(bufio.readinto(b), 2)
1082 self.assertEqual(b, b"ef")
1083 self.assertEqual(bufio.readinto(b), 1)
1084 self.assertEqual(b, b"gf")
1085 self.assertEqual(bufio.readinto(b), 0)
1086 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001087 rawio = self.MockRawIO((b"abc", None))
1088 bufio = self.tp(rawio)
1089 self.assertEqual(bufio.readinto(b), 2)
1090 self.assertEqual(b, b"ab")
1091 self.assertEqual(bufio.readinto(b), 1)
1092 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001093
Benjamin Petersona96fea02014-06-22 14:17:44 -07001094 def test_readinto1(self):
1095 buffer_size = 10
1096 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1097 bufio = self.tp(rawio, buffer_size=buffer_size)
1098 b = bytearray(2)
1099 self.assertEqual(bufio.peek(3), b'abc')
1100 self.assertEqual(rawio._reads, 1)
1101 self.assertEqual(bufio.readinto1(b), 2)
1102 self.assertEqual(b, b"ab")
1103 self.assertEqual(rawio._reads, 1)
1104 self.assertEqual(bufio.readinto1(b), 1)
1105 self.assertEqual(b[:1], b"c")
1106 self.assertEqual(rawio._reads, 1)
1107 self.assertEqual(bufio.readinto1(b), 2)
1108 self.assertEqual(b, b"de")
1109 self.assertEqual(rawio._reads, 2)
1110 b = bytearray(2*buffer_size)
1111 self.assertEqual(bufio.peek(3), b'fgh')
1112 self.assertEqual(rawio._reads, 3)
1113 self.assertEqual(bufio.readinto1(b), 6)
1114 self.assertEqual(b[:6], b"fghjkl")
1115 self.assertEqual(rawio._reads, 4)
1116
1117 def test_readinto_array(self):
1118 buffer_size = 60
1119 data = b"a" * 26
1120 rawio = self.MockRawIO((data,))
1121 bufio = self.tp(rawio, buffer_size=buffer_size)
1122
1123 # Create an array with element size > 1 byte
1124 b = array.array('i', b'x' * 32)
1125 assert len(b) != 16
1126
1127 # Read into it. We should get as many *bytes* as we can fit into b
1128 # (which is more than the number of elements)
1129 n = bufio.readinto(b)
1130 self.assertGreater(n, len(b))
1131
1132 # Check that old contents of b are preserved
1133 bm = memoryview(b).cast('B')
1134 self.assertLess(n, len(bm))
1135 self.assertEqual(bm[:n], data[:n])
1136 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1137
1138 def test_readinto1_array(self):
1139 buffer_size = 60
1140 data = b"a" * 26
1141 rawio = self.MockRawIO((data,))
1142 bufio = self.tp(rawio, buffer_size=buffer_size)
1143
1144 # Create an array with element size > 1 byte
1145 b = array.array('i', b'x' * 32)
1146 assert len(b) != 16
1147
1148 # Read into it. We should get as many *bytes* as we can fit into b
1149 # (which is more than the number of elements)
1150 n = bufio.readinto1(b)
1151 self.assertGreater(n, len(b))
1152
1153 # Check that old contents of b are preserved
1154 bm = memoryview(b).cast('B')
1155 self.assertLess(n, len(bm))
1156 self.assertEqual(bm[:n], data[:n])
1157 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1158
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001159 def test_readlines(self):
1160 def bufio():
1161 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1162 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001163 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1164 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1165 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001167 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001168 data = b"abcdefghi"
1169 dlen = len(data)
1170
1171 tests = [
1172 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1173 [ 100, [ 3, 3, 3], [ dlen ] ],
1174 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1175 ]
1176
1177 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001178 rawio = self.MockFileIO(data)
1179 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001180 pos = 0
1181 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001182 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001183 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001184 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001185 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001186
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001187 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001188 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001189 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1190 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001191 self.assertEqual(b"abcd", bufio.read(6))
1192 self.assertEqual(b"e", bufio.read(1))
1193 self.assertEqual(b"fg", bufio.read())
1194 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001195 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001196 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001197
Victor Stinnera80987f2011-05-25 22:47:16 +02001198 rawio = self.MockRawIO((b"a", None, None))
1199 self.assertEqual(b"a", rawio.readall())
1200 self.assertIsNone(rawio.readall())
1201
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001202 def test_read_past_eof(self):
1203 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1204 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001205
Ezio Melottib3aedd42010-11-20 19:04:17 +00001206 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001207
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001208 def test_read_all(self):
1209 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1210 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001211
Ezio Melottib3aedd42010-11-20 19:04:17 +00001212 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001213
Victor Stinner45df8202010-04-28 22:31:17 +00001214 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001215 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001216 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001217 try:
1218 # Write out many bytes with exactly the same number of 0's,
1219 # 1's... 255's. This will help us check that concurrent reading
1220 # doesn't duplicate or forget contents.
1221 N = 1000
1222 l = list(range(256)) * N
1223 random.shuffle(l)
1224 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001225 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001226 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001227 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001228 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001229 errors = []
1230 results = []
1231 def f():
1232 try:
1233 # Intra-buffer read then buffer-flushing read
1234 for n in cycle([1, 19]):
1235 s = bufio.read(n)
1236 if not s:
1237 break
1238 # list.append() is atomic
1239 results.append(s)
1240 except Exception as e:
1241 errors.append(e)
1242 raise
1243 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001244 with support.start_threads(threads):
1245 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001246 self.assertFalse(errors,
1247 "the following exceptions were caught: %r" % errors)
1248 s = b''.join(results)
1249 for i in range(256):
1250 c = bytes(bytearray([i]))
1251 self.assertEqual(s.count(c), N)
1252 finally:
1253 support.unlink(support.TESTFN)
1254
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001255 def test_unseekable(self):
1256 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1257 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1258 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1259 bufio.read(1)
1260 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1261 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1262
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001263 def test_misbehaved_io(self):
1264 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1265 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001266 self.assertRaises(OSError, bufio.seek, 0)
1267 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001268
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001269 def test_no_extraneous_read(self):
1270 # Issue #9550; when the raw IO object has satisfied the read request,
1271 # we should not issue any additional reads, otherwise it may block
1272 # (e.g. socket).
1273 bufsize = 16
1274 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1275 rawio = self.MockRawIO([b"x" * n])
1276 bufio = self.tp(rawio, bufsize)
1277 self.assertEqual(bufio.read(n), b"x" * n)
1278 # Simple case: one raw read is enough to satisfy the request.
1279 self.assertEqual(rawio._extraneous_reads, 0,
1280 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1281 # A more complex case where two raw reads are needed to satisfy
1282 # the request.
1283 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1284 bufio = self.tp(rawio, bufsize)
1285 self.assertEqual(bufio.read(n), b"x" * n)
1286 self.assertEqual(rawio._extraneous_reads, 0,
1287 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1288
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001289 def test_read_on_closed(self):
1290 # Issue #23796
1291 b = io.BufferedReader(io.BytesIO(b"12"))
1292 b.read(1)
1293 b.close()
1294 self.assertRaises(ValueError, b.peek)
1295 self.assertRaises(ValueError, b.read1, 1)
1296
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001297
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001298class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001299 tp = io.BufferedReader
1300
1301 def test_constructor(self):
1302 BufferedReaderTest.test_constructor(self)
1303 # The allocation can succeed on 32-bit builds, e.g. with more
1304 # than 2GB RAM and a 64-bit kernel.
1305 if sys.maxsize > 0x7FFFFFFF:
1306 rawio = self.MockRawIO()
1307 bufio = self.tp(rawio)
1308 self.assertRaises((OverflowError, MemoryError, ValueError),
1309 bufio.__init__, rawio, sys.maxsize)
1310
1311 def test_initialization(self):
1312 rawio = self.MockRawIO([b"abc"])
1313 bufio = self.tp(rawio)
1314 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1315 self.assertRaises(ValueError, bufio.read)
1316 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1317 self.assertRaises(ValueError, bufio.read)
1318 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1319 self.assertRaises(ValueError, bufio.read)
1320
1321 def test_misbehaved_io_read(self):
1322 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1323 bufio = self.tp(rawio)
1324 # _pyio.BufferedReader seems to implement reading different, so that
1325 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001326 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001327
1328 def test_garbage_collection(self):
1329 # C BufferedReader objects are collected.
1330 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001331 with support.check_warnings(('', ResourceWarning)):
1332 rawio = self.FileIO(support.TESTFN, "w+b")
1333 f = self.tp(rawio)
1334 f.f = f
1335 wr = weakref.ref(f)
1336 del f
1337 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001338 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001339
R David Murray67bfe802013-02-23 21:51:05 -05001340 def test_args_error(self):
1341 # Issue #17275
1342 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1343 self.tp(io.BytesIO(), 1024, 1024, 1024)
1344
1345
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001346class PyBufferedReaderTest(BufferedReaderTest):
1347 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001348
Guido van Rossuma9e20242007-03-08 00:43:48 +00001349
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001350class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1351 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001352
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001353 def test_constructor(self):
1354 rawio = self.MockRawIO()
1355 bufio = self.tp(rawio)
1356 bufio.__init__(rawio)
1357 bufio.__init__(rawio, buffer_size=1024)
1358 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001359 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001360 bufio.flush()
1361 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1362 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1363 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1364 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001365 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001366 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001367 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001368
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001369 def test_uninitialized(self):
1370 bufio = self.tp.__new__(self.tp)
1371 del bufio
1372 bufio = self.tp.__new__(self.tp)
1373 self.assertRaisesRegex((ValueError, AttributeError),
1374 'uninitialized|has no attribute',
1375 bufio.write, b'')
1376 bufio.__init__(self.MockRawIO())
1377 self.assertEqual(bufio.write(b''), 0)
1378
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001379 def test_detach_flush(self):
1380 raw = self.MockRawIO()
1381 buf = self.tp(raw)
1382 buf.write(b"howdy!")
1383 self.assertFalse(raw._write_stack)
1384 buf.detach()
1385 self.assertEqual(raw._write_stack, [b"howdy!"])
1386
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001387 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001388 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001389 writer = self.MockRawIO()
1390 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001391 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001392 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001393
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001394 def test_write_overflow(self):
1395 writer = self.MockRawIO()
1396 bufio = self.tp(writer, 8)
1397 contents = b"abcdefghijklmnop"
1398 for n in range(0, len(contents), 3):
1399 bufio.write(contents[n:n+3])
1400 flushed = b"".join(writer._write_stack)
1401 # At least (total - 8) bytes were implicitly flushed, perhaps more
1402 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001403 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001404
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001405 def check_writes(self, intermediate_func):
1406 # Lots of writes, test the flushed output is as expected.
1407 contents = bytes(range(256)) * 1000
1408 n = 0
1409 writer = self.MockRawIO()
1410 bufio = self.tp(writer, 13)
1411 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1412 def gen_sizes():
1413 for size in count(1):
1414 for i in range(15):
1415 yield size
1416 sizes = gen_sizes()
1417 while n < len(contents):
1418 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001419 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001420 intermediate_func(bufio)
1421 n += size
1422 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001423 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001424
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001425 def test_writes(self):
1426 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001427
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001428 def test_writes_and_flushes(self):
1429 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001430
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001431 def test_writes_and_seeks(self):
1432 def _seekabs(bufio):
1433 pos = bufio.tell()
1434 bufio.seek(pos + 1, 0)
1435 bufio.seek(pos - 1, 0)
1436 bufio.seek(pos, 0)
1437 self.check_writes(_seekabs)
1438 def _seekrel(bufio):
1439 pos = bufio.seek(0, 1)
1440 bufio.seek(+1, 1)
1441 bufio.seek(-1, 1)
1442 bufio.seek(pos, 0)
1443 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001444
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001445 def test_writes_and_truncates(self):
1446 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001447
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001448 def test_write_non_blocking(self):
1449 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001450 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001451
Ezio Melottib3aedd42010-11-20 19:04:17 +00001452 self.assertEqual(bufio.write(b"abcd"), 4)
1453 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001454 # 1 byte will be written, the rest will be buffered
1455 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001456 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001457
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001458 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1459 raw.block_on(b"0")
1460 try:
1461 bufio.write(b"opqrwxyz0123456789")
1462 except self.BlockingIOError as e:
1463 written = e.characters_written
1464 else:
1465 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001466 self.assertEqual(written, 16)
1467 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001468 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001469
Ezio Melottib3aedd42010-11-20 19:04:17 +00001470 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001471 s = raw.pop_written()
1472 # Previously buffered bytes were flushed
1473 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001474
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001475 def test_write_and_rewind(self):
1476 raw = io.BytesIO()
1477 bufio = self.tp(raw, 4)
1478 self.assertEqual(bufio.write(b"abcdef"), 6)
1479 self.assertEqual(bufio.tell(), 6)
1480 bufio.seek(0, 0)
1481 self.assertEqual(bufio.write(b"XY"), 2)
1482 bufio.seek(6, 0)
1483 self.assertEqual(raw.getvalue(), b"XYcdef")
1484 self.assertEqual(bufio.write(b"123456"), 6)
1485 bufio.flush()
1486 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001487
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001488 def test_flush(self):
1489 writer = self.MockRawIO()
1490 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001491 bufio.write(b"abc")
1492 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001493 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001494
Antoine Pitrou131a4892012-10-16 22:57:11 +02001495 def test_writelines(self):
1496 l = [b'ab', b'cd', b'ef']
1497 writer = self.MockRawIO()
1498 bufio = self.tp(writer, 8)
1499 bufio.writelines(l)
1500 bufio.flush()
1501 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1502
1503 def test_writelines_userlist(self):
1504 l = UserList([b'ab', b'cd', b'ef'])
1505 writer = self.MockRawIO()
1506 bufio = self.tp(writer, 8)
1507 bufio.writelines(l)
1508 bufio.flush()
1509 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1510
1511 def test_writelines_error(self):
1512 writer = self.MockRawIO()
1513 bufio = self.tp(writer, 8)
1514 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1515 self.assertRaises(TypeError, bufio.writelines, None)
1516 self.assertRaises(TypeError, bufio.writelines, 'abc')
1517
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001518 def test_destructor(self):
1519 writer = self.MockRawIO()
1520 bufio = self.tp(writer, 8)
1521 bufio.write(b"abc")
1522 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001523 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001524 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001525
1526 def test_truncate(self):
1527 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001528 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001529 bufio = self.tp(raw, 8)
1530 bufio.write(b"abcdef")
1531 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001532 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001533 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001534 self.assertEqual(f.read(), b"abc")
1535
Victor Stinner45df8202010-04-28 22:31:17 +00001536 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001537 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001538 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001539 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001540 # Write out many bytes from many threads and test they were
1541 # all flushed.
1542 N = 1000
1543 contents = bytes(range(256)) * N
1544 sizes = cycle([1, 19])
1545 n = 0
1546 queue = deque()
1547 while n < len(contents):
1548 size = next(sizes)
1549 queue.append(contents[n:n+size])
1550 n += size
1551 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001552 # We use a real file object because it allows us to
1553 # exercise situations where the GIL is released before
1554 # writing the buffer to the raw streams. This is in addition
1555 # to concurrency issues due to switching threads in the middle
1556 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001557 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001558 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001559 errors = []
1560 def f():
1561 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001562 while True:
1563 try:
1564 s = queue.popleft()
1565 except IndexError:
1566 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001567 bufio.write(s)
1568 except Exception as e:
1569 errors.append(e)
1570 raise
1571 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001572 with support.start_threads(threads):
1573 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001574 self.assertFalse(errors,
1575 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001576 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001577 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001578 s = f.read()
1579 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001580 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001581 finally:
1582 support.unlink(support.TESTFN)
1583
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001584 def test_misbehaved_io(self):
1585 rawio = self.MisbehavedRawIO()
1586 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001587 self.assertRaises(OSError, bufio.seek, 0)
1588 self.assertRaises(OSError, bufio.tell)
1589 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001590
Florent Xicluna109d5732012-07-07 17:03:22 +02001591 def test_max_buffer_size_removal(self):
1592 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001593 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001594
Benjamin Peterson68623612012-12-20 11:53:11 -06001595 def test_write_error_on_close(self):
1596 raw = self.MockRawIO()
1597 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001598 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001599 raw.write = bad_write
1600 b = self.tp(raw)
1601 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001602 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001603 self.assertTrue(b.closed)
1604
Benjamin Peterson59406a92009-03-26 17:10:29 +00001605
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001606class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001607 tp = io.BufferedWriter
1608
1609 def test_constructor(self):
1610 BufferedWriterTest.test_constructor(self)
1611 # The allocation can succeed on 32-bit builds, e.g. with more
1612 # than 2GB RAM and a 64-bit kernel.
1613 if sys.maxsize > 0x7FFFFFFF:
1614 rawio = self.MockRawIO()
1615 bufio = self.tp(rawio)
1616 self.assertRaises((OverflowError, MemoryError, ValueError),
1617 bufio.__init__, rawio, sys.maxsize)
1618
1619 def test_initialization(self):
1620 rawio = self.MockRawIO()
1621 bufio = self.tp(rawio)
1622 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1623 self.assertRaises(ValueError, bufio.write, b"def")
1624 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1625 self.assertRaises(ValueError, bufio.write, b"def")
1626 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1627 self.assertRaises(ValueError, bufio.write, b"def")
1628
1629 def test_garbage_collection(self):
1630 # C BufferedWriter objects are collected, and collecting them flushes
1631 # all data to disk.
1632 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001633 with support.check_warnings(('', ResourceWarning)):
1634 rawio = self.FileIO(support.TESTFN, "w+b")
1635 f = self.tp(rawio)
1636 f.write(b"123xxx")
1637 f.x = f
1638 wr = weakref.ref(f)
1639 del f
1640 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001641 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001642 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001643 self.assertEqual(f.read(), b"123xxx")
1644
R David Murray67bfe802013-02-23 21:51:05 -05001645 def test_args_error(self):
1646 # Issue #17275
1647 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1648 self.tp(io.BytesIO(), 1024, 1024, 1024)
1649
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001650
1651class PyBufferedWriterTest(BufferedWriterTest):
1652 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001653
Guido van Rossum01a27522007-03-07 01:00:12 +00001654class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001655
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001656 def test_constructor(self):
1657 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001658 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001659
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001660 def test_uninitialized(self):
1661 pair = self.tp.__new__(self.tp)
1662 del pair
1663 pair = self.tp.__new__(self.tp)
1664 self.assertRaisesRegex((ValueError, AttributeError),
1665 'uninitialized|has no attribute',
1666 pair.read, 0)
1667 self.assertRaisesRegex((ValueError, AttributeError),
1668 'uninitialized|has no attribute',
1669 pair.write, b'')
1670 pair.__init__(self.MockRawIO(), self.MockRawIO())
1671 self.assertEqual(pair.read(0), b'')
1672 self.assertEqual(pair.write(b''), 0)
1673
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001674 def test_detach(self):
1675 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1676 self.assertRaises(self.UnsupportedOperation, pair.detach)
1677
Florent Xicluna109d5732012-07-07 17:03:22 +02001678 def test_constructor_max_buffer_size_removal(self):
1679 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001680 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001681
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001682 def test_constructor_with_not_readable(self):
1683 class NotReadable(MockRawIO):
1684 def readable(self):
1685 return False
1686
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001687 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001688
1689 def test_constructor_with_not_writeable(self):
1690 class NotWriteable(MockRawIO):
1691 def writable(self):
1692 return False
1693
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001694 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001695
1696 def test_read(self):
1697 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1698
1699 self.assertEqual(pair.read(3), b"abc")
1700 self.assertEqual(pair.read(1), b"d")
1701 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001702 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1703 self.assertEqual(pair.read(None), b"abc")
1704
1705 def test_readlines(self):
1706 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1707 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1708 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1709 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001710
1711 def test_read1(self):
1712 # .read1() is delegated to the underlying reader object, so this test
1713 # can be shallow.
1714 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1715
1716 self.assertEqual(pair.read1(3), b"abc")
1717
1718 def test_readinto(self):
1719 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1720
1721 data = bytearray(5)
1722 self.assertEqual(pair.readinto(data), 5)
1723 self.assertEqual(data, b"abcde")
1724
1725 def test_write(self):
1726 w = self.MockRawIO()
1727 pair = self.tp(self.MockRawIO(), w)
1728
1729 pair.write(b"abc")
1730 pair.flush()
1731 pair.write(b"def")
1732 pair.flush()
1733 self.assertEqual(w._write_stack, [b"abc", b"def"])
1734
1735 def test_peek(self):
1736 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1737
1738 self.assertTrue(pair.peek(3).startswith(b"abc"))
1739 self.assertEqual(pair.read(3), b"abc")
1740
1741 def test_readable(self):
1742 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1743 self.assertTrue(pair.readable())
1744
1745 def test_writeable(self):
1746 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1747 self.assertTrue(pair.writable())
1748
1749 def test_seekable(self):
1750 # BufferedRWPairs are never seekable, even if their readers and writers
1751 # are.
1752 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1753 self.assertFalse(pair.seekable())
1754
1755 # .flush() is delegated to the underlying writer object and has been
1756 # tested in the test_write method.
1757
1758 def test_close_and_closed(self):
1759 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1760 self.assertFalse(pair.closed)
1761 pair.close()
1762 self.assertTrue(pair.closed)
1763
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001764 def test_reader_close_error_on_close(self):
1765 def reader_close():
1766 reader_non_existing
1767 reader = self.MockRawIO()
1768 reader.close = reader_close
1769 writer = self.MockRawIO()
1770 pair = self.tp(reader, writer)
1771 with self.assertRaises(NameError) as err:
1772 pair.close()
1773 self.assertIn('reader_non_existing', str(err.exception))
1774 self.assertTrue(pair.closed)
1775 self.assertFalse(reader.closed)
1776 self.assertTrue(writer.closed)
1777
1778 def test_writer_close_error_on_close(self):
1779 def writer_close():
1780 writer_non_existing
1781 reader = self.MockRawIO()
1782 writer = self.MockRawIO()
1783 writer.close = writer_close
1784 pair = self.tp(reader, writer)
1785 with self.assertRaises(NameError) as err:
1786 pair.close()
1787 self.assertIn('writer_non_existing', str(err.exception))
1788 self.assertFalse(pair.closed)
1789 self.assertTrue(reader.closed)
1790 self.assertFalse(writer.closed)
1791
1792 def test_reader_writer_close_error_on_close(self):
1793 def reader_close():
1794 reader_non_existing
1795 def writer_close():
1796 writer_non_existing
1797 reader = self.MockRawIO()
1798 reader.close = reader_close
1799 writer = self.MockRawIO()
1800 writer.close = writer_close
1801 pair = self.tp(reader, writer)
1802 with self.assertRaises(NameError) as err:
1803 pair.close()
1804 self.assertIn('reader_non_existing', str(err.exception))
1805 self.assertIsInstance(err.exception.__context__, NameError)
1806 self.assertIn('writer_non_existing', str(err.exception.__context__))
1807 self.assertFalse(pair.closed)
1808 self.assertFalse(reader.closed)
1809 self.assertFalse(writer.closed)
1810
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001811 def test_isatty(self):
1812 class SelectableIsAtty(MockRawIO):
1813 def __init__(self, isatty):
1814 MockRawIO.__init__(self)
1815 self._isatty = isatty
1816
1817 def isatty(self):
1818 return self._isatty
1819
1820 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1821 self.assertFalse(pair.isatty())
1822
1823 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1824 self.assertTrue(pair.isatty())
1825
1826 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1827 self.assertTrue(pair.isatty())
1828
1829 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1830 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001831
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001832 def test_weakref_clearing(self):
1833 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1834 ref = weakref.ref(brw)
1835 brw = None
1836 ref = None # Shouldn't segfault.
1837
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001838class CBufferedRWPairTest(BufferedRWPairTest):
1839 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001840
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001841class PyBufferedRWPairTest(BufferedRWPairTest):
1842 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001843
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001844
1845class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1846 read_mode = "rb+"
1847 write_mode = "wb+"
1848
1849 def test_constructor(self):
1850 BufferedReaderTest.test_constructor(self)
1851 BufferedWriterTest.test_constructor(self)
1852
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001853 def test_uninitialized(self):
1854 BufferedReaderTest.test_uninitialized(self)
1855 BufferedWriterTest.test_uninitialized(self)
1856
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001857 def test_read_and_write(self):
1858 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001859 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001860
1861 self.assertEqual(b"as", rw.read(2))
1862 rw.write(b"ddd")
1863 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001864 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001865 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001866 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001867
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001868 def test_seek_and_tell(self):
1869 raw = self.BytesIO(b"asdfghjkl")
1870 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001871
Ezio Melottib3aedd42010-11-20 19:04:17 +00001872 self.assertEqual(b"as", rw.read(2))
1873 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001874 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001875 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001876
Antoine Pitroue05565e2011-08-20 14:39:23 +02001877 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001878 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001879 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001880 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001881 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001882 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001883 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001884 self.assertEqual(7, rw.tell())
1885 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001886 rw.flush()
1887 self.assertEqual(b"asdf123fl", raw.getvalue())
1888
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001889 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001890
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001891 def check_flush_and_read(self, read_func):
1892 raw = self.BytesIO(b"abcdefghi")
1893 bufio = self.tp(raw)
1894
Ezio Melottib3aedd42010-11-20 19:04:17 +00001895 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001896 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001897 self.assertEqual(b"ef", read_func(bufio, 2))
1898 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001899 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001900 self.assertEqual(6, bufio.tell())
1901 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001902 raw.seek(0, 0)
1903 raw.write(b"XYZ")
1904 # flush() resets the read buffer
1905 bufio.flush()
1906 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001907 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001908
1909 def test_flush_and_read(self):
1910 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1911
1912 def test_flush_and_readinto(self):
1913 def _readinto(bufio, n=-1):
1914 b = bytearray(n if n >= 0 else 9999)
1915 n = bufio.readinto(b)
1916 return bytes(b[:n])
1917 self.check_flush_and_read(_readinto)
1918
1919 def test_flush_and_peek(self):
1920 def _peek(bufio, n=-1):
1921 # This relies on the fact that the buffer can contain the whole
1922 # raw stream, otherwise peek() can return less.
1923 b = bufio.peek(n)
1924 if n != -1:
1925 b = b[:n]
1926 bufio.seek(len(b), 1)
1927 return b
1928 self.check_flush_and_read(_peek)
1929
1930 def test_flush_and_write(self):
1931 raw = self.BytesIO(b"abcdefghi")
1932 bufio = self.tp(raw)
1933
1934 bufio.write(b"123")
1935 bufio.flush()
1936 bufio.write(b"45")
1937 bufio.flush()
1938 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001939 self.assertEqual(b"12345fghi", raw.getvalue())
1940 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001941
1942 def test_threads(self):
1943 BufferedReaderTest.test_threads(self)
1944 BufferedWriterTest.test_threads(self)
1945
1946 def test_writes_and_peek(self):
1947 def _peek(bufio):
1948 bufio.peek(1)
1949 self.check_writes(_peek)
1950 def _peek(bufio):
1951 pos = bufio.tell()
1952 bufio.seek(-1, 1)
1953 bufio.peek(1)
1954 bufio.seek(pos, 0)
1955 self.check_writes(_peek)
1956
1957 def test_writes_and_reads(self):
1958 def _read(bufio):
1959 bufio.seek(-1, 1)
1960 bufio.read(1)
1961 self.check_writes(_read)
1962
1963 def test_writes_and_read1s(self):
1964 def _read1(bufio):
1965 bufio.seek(-1, 1)
1966 bufio.read1(1)
1967 self.check_writes(_read1)
1968
1969 def test_writes_and_readintos(self):
1970 def _read(bufio):
1971 bufio.seek(-1, 1)
1972 bufio.readinto(bytearray(1))
1973 self.check_writes(_read)
1974
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001975 def test_write_after_readahead(self):
1976 # Issue #6629: writing after the buffer was filled by readahead should
1977 # first rewind the raw stream.
1978 for overwrite_size in [1, 5]:
1979 raw = self.BytesIO(b"A" * 10)
1980 bufio = self.tp(raw, 4)
1981 # Trigger readahead
1982 self.assertEqual(bufio.read(1), b"A")
1983 self.assertEqual(bufio.tell(), 1)
1984 # Overwriting should rewind the raw stream if it needs so
1985 bufio.write(b"B" * overwrite_size)
1986 self.assertEqual(bufio.tell(), overwrite_size + 1)
1987 # If the write size was smaller than the buffer size, flush() and
1988 # check that rewind happens.
1989 bufio.flush()
1990 self.assertEqual(bufio.tell(), overwrite_size + 1)
1991 s = raw.getvalue()
1992 self.assertEqual(s,
1993 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1994
Antoine Pitrou7c404892011-05-13 00:13:33 +02001995 def test_write_rewind_write(self):
1996 # Various combinations of reading / writing / seeking backwards / writing again
1997 def mutate(bufio, pos1, pos2):
1998 assert pos2 >= pos1
1999 # Fill the buffer
2000 bufio.seek(pos1)
2001 bufio.read(pos2 - pos1)
2002 bufio.write(b'\x02')
2003 # This writes earlier than the previous write, but still inside
2004 # the buffer.
2005 bufio.seek(pos1)
2006 bufio.write(b'\x01')
2007
2008 b = b"\x80\x81\x82\x83\x84"
2009 for i in range(0, len(b)):
2010 for j in range(i, len(b)):
2011 raw = self.BytesIO(b)
2012 bufio = self.tp(raw, 100)
2013 mutate(bufio, i, j)
2014 bufio.flush()
2015 expected = bytearray(b)
2016 expected[j] = 2
2017 expected[i] = 1
2018 self.assertEqual(raw.getvalue(), expected,
2019 "failed result for i=%d, j=%d" % (i, j))
2020
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002021 def test_truncate_after_read_or_write(self):
2022 raw = self.BytesIO(b"A" * 10)
2023 bufio = self.tp(raw, 100)
2024 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2025 self.assertEqual(bufio.truncate(), 2)
2026 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2027 self.assertEqual(bufio.truncate(), 4)
2028
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002029 def test_misbehaved_io(self):
2030 BufferedReaderTest.test_misbehaved_io(self)
2031 BufferedWriterTest.test_misbehaved_io(self)
2032
Antoine Pitroue05565e2011-08-20 14:39:23 +02002033 def test_interleaved_read_write(self):
2034 # Test for issue #12213
2035 with self.BytesIO(b'abcdefgh') as raw:
2036 with self.tp(raw, 100) as f:
2037 f.write(b"1")
2038 self.assertEqual(f.read(1), b'b')
2039 f.write(b'2')
2040 self.assertEqual(f.read1(1), b'd')
2041 f.write(b'3')
2042 buf = bytearray(1)
2043 f.readinto(buf)
2044 self.assertEqual(buf, b'f')
2045 f.write(b'4')
2046 self.assertEqual(f.peek(1), b'h')
2047 f.flush()
2048 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2049
2050 with self.BytesIO(b'abc') as raw:
2051 with self.tp(raw, 100) as f:
2052 self.assertEqual(f.read(1), b'a')
2053 f.write(b"2")
2054 self.assertEqual(f.read(1), b'c')
2055 f.flush()
2056 self.assertEqual(raw.getvalue(), b'a2c')
2057
2058 def test_interleaved_readline_write(self):
2059 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2060 with self.tp(raw) as f:
2061 f.write(b'1')
2062 self.assertEqual(f.readline(), b'b\n')
2063 f.write(b'2')
2064 self.assertEqual(f.readline(), b'def\n')
2065 f.write(b'3')
2066 self.assertEqual(f.readline(), b'\n')
2067 f.flush()
2068 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2069
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002070 # You can't construct a BufferedRandom over a non-seekable stream.
2071 test_unseekable = None
2072
R David Murray67bfe802013-02-23 21:51:05 -05002073
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002074class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002075 tp = io.BufferedRandom
2076
2077 def test_constructor(self):
2078 BufferedRandomTest.test_constructor(self)
2079 # The allocation can succeed on 32-bit builds, e.g. with more
2080 # than 2GB RAM and a 64-bit kernel.
2081 if sys.maxsize > 0x7FFFFFFF:
2082 rawio = self.MockRawIO()
2083 bufio = self.tp(rawio)
2084 self.assertRaises((OverflowError, MemoryError, ValueError),
2085 bufio.__init__, rawio, sys.maxsize)
2086
2087 def test_garbage_collection(self):
2088 CBufferedReaderTest.test_garbage_collection(self)
2089 CBufferedWriterTest.test_garbage_collection(self)
2090
R David Murray67bfe802013-02-23 21:51:05 -05002091 def test_args_error(self):
2092 # Issue #17275
2093 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2094 self.tp(io.BytesIO(), 1024, 1024, 1024)
2095
2096
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002097class PyBufferedRandomTest(BufferedRandomTest):
2098 tp = pyio.BufferedRandom
2099
2100
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002101# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2102# properties:
2103# - A single output character can correspond to many bytes of input.
2104# - The number of input bytes to complete the character can be
2105# undetermined until the last input byte is received.
2106# - The number of input bytes can vary depending on previous input.
2107# - A single input byte can correspond to many characters of output.
2108# - The number of output characters can be undetermined until the
2109# last input byte is received.
2110# - The number of output characters can vary depending on previous input.
2111
2112class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2113 """
2114 For testing seek/tell behavior with a stateful, buffering decoder.
2115
2116 Input is a sequence of words. Words may be fixed-length (length set
2117 by input) or variable-length (period-terminated). In variable-length
2118 mode, extra periods are ignored. Possible words are:
2119 - 'i' followed by a number sets the input length, I (maximum 99).
2120 When I is set to 0, words are space-terminated.
2121 - 'o' followed by a number sets the output length, O (maximum 99).
2122 - Any other word is converted into a word followed by a period on
2123 the output. The output word consists of the input word truncated
2124 or padded out with hyphens to make its length equal to O. If O
2125 is 0, the word is output verbatim without truncating or padding.
2126 I and O are initially set to 1. When I changes, any buffered input is
2127 re-scanned according to the new I. EOF also terminates the last word.
2128 """
2129
2130 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002131 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002132 self.reset()
2133
2134 def __repr__(self):
2135 return '<SID %x>' % id(self)
2136
2137 def reset(self):
2138 self.i = 1
2139 self.o = 1
2140 self.buffer = bytearray()
2141
2142 def getstate(self):
2143 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2144 return bytes(self.buffer), i*100 + o
2145
2146 def setstate(self, state):
2147 buffer, io = state
2148 self.buffer = bytearray(buffer)
2149 i, o = divmod(io, 100)
2150 self.i, self.o = i ^ 1, o ^ 1
2151
2152 def decode(self, input, final=False):
2153 output = ''
2154 for b in input:
2155 if self.i == 0: # variable-length, terminated with period
2156 if b == ord('.'):
2157 if self.buffer:
2158 output += self.process_word()
2159 else:
2160 self.buffer.append(b)
2161 else: # fixed-length, terminate after self.i bytes
2162 self.buffer.append(b)
2163 if len(self.buffer) == self.i:
2164 output += self.process_word()
2165 if final and self.buffer: # EOF terminates the last word
2166 output += self.process_word()
2167 return output
2168
2169 def process_word(self):
2170 output = ''
2171 if self.buffer[0] == ord('i'):
2172 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2173 elif self.buffer[0] == ord('o'):
2174 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2175 else:
2176 output = self.buffer.decode('ascii')
2177 if len(output) < self.o:
2178 output += '-'*self.o # pad out with hyphens
2179 if self.o:
2180 output = output[:self.o] # truncate to output length
2181 output += '.'
2182 self.buffer = bytearray()
2183 return output
2184
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002185 codecEnabled = False
2186
2187 @classmethod
2188 def lookupTestDecoder(cls, name):
2189 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002190 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002191 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002192 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002193 incrementalencoder=None,
2194 streamreader=None, streamwriter=None,
2195 incrementaldecoder=cls)
2196
2197# Register the previous decoder for testing.
2198# Disabled by default, tests will enable it.
2199codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2200
2201
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002202class StatefulIncrementalDecoderTest(unittest.TestCase):
2203 """
2204 Make sure the StatefulIncrementalDecoder actually works.
2205 """
2206
2207 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002208 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002209 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002210 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002211 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002212 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002213 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002214 # I=0, O=6 (variable-length input, fixed-length output)
2215 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2216 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002217 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002218 # I=6, O=3 (fixed-length input > fixed-length output)
2219 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2220 # I=0, then 3; O=29, then 15 (with longer output)
2221 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2222 'a----------------------------.' +
2223 'b----------------------------.' +
2224 'cde--------------------------.' +
2225 'abcdefghijabcde.' +
2226 'a.b------------.' +
2227 '.c.------------.' +
2228 'd.e------------.' +
2229 'k--------------.' +
2230 'l--------------.' +
2231 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002232 ]
2233
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002234 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002235 # Try a few one-shot test cases.
2236 for input, eof, output in self.test_cases:
2237 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002238 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002239
2240 # Also test an unfinished decode, followed by forcing EOF.
2241 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002242 self.assertEqual(d.decode(b'oiabcd'), '')
2243 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002244
2245class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002246
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002247 def setUp(self):
2248 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2249 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002250 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002251
Guido van Rossumd0712812007-04-11 16:32:43 +00002252 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002253 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002254
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002255 def test_constructor(self):
2256 r = self.BytesIO(b"\xc3\xa9\n\n")
2257 b = self.BufferedReader(r, 1000)
2258 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002259 t.__init__(b, encoding="latin-1", newline="\r\n")
2260 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002261 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002262 t.__init__(b, encoding="utf-8", line_buffering=True)
2263 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002264 self.assertEqual(t.line_buffering, True)
2265 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002266 self.assertRaises(TypeError, t.__init__, b, newline=42)
2267 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2268
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002269 def test_uninitialized(self):
2270 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2271 del t
2272 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2273 self.assertRaises(Exception, repr, t)
2274 self.assertRaisesRegex((ValueError, AttributeError),
2275 'uninitialized|has no attribute',
2276 t.read, 0)
2277 t.__init__(self.MockRawIO())
2278 self.assertEqual(t.read(0), '')
2279
Nick Coghlana9b15242014-02-04 22:11:18 +10002280 def test_non_text_encoding_codecs_are_rejected(self):
2281 # Ensure the constructor complains if passed a codec that isn't
2282 # marked as a text encoding
2283 # http://bugs.python.org/issue20404
2284 r = self.BytesIO()
2285 b = self.BufferedWriter(r)
2286 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2287 self.TextIOWrapper(b, encoding="hex")
2288
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002289 def test_detach(self):
2290 r = self.BytesIO()
2291 b = self.BufferedWriter(r)
2292 t = self.TextIOWrapper(b)
2293 self.assertIs(t.detach(), b)
2294
2295 t = self.TextIOWrapper(b, encoding="ascii")
2296 t.write("howdy")
2297 self.assertFalse(r.getvalue())
2298 t.detach()
2299 self.assertEqual(r.getvalue(), b"howdy")
2300 self.assertRaises(ValueError, t.detach)
2301
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002302 # Operations independent of the detached stream should still work
2303 repr(t)
2304 self.assertEqual(t.encoding, "ascii")
2305 self.assertEqual(t.errors, "strict")
2306 self.assertFalse(t.line_buffering)
2307
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002308 def test_repr(self):
2309 raw = self.BytesIO("hello".encode("utf-8"))
2310 b = self.BufferedReader(raw)
2311 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002312 modname = self.TextIOWrapper.__module__
2313 self.assertEqual(repr(t),
2314 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2315 raw.name = "dummy"
2316 self.assertEqual(repr(t),
2317 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002318 t.mode = "r"
2319 self.assertEqual(repr(t),
2320 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002321 raw.name = b"dummy"
2322 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002323 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002324
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002325 t.buffer.detach()
2326 repr(t) # Should not raise an exception
2327
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002328 def test_line_buffering(self):
2329 r = self.BytesIO()
2330 b = self.BufferedWriter(r, 1000)
2331 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002332 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002333 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002334 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002335 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002336 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002337 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002338
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002339 def test_default_encoding(self):
2340 old_environ = dict(os.environ)
2341 try:
2342 # try to get a user preferred encoding different than the current
2343 # locale encoding to check that TextIOWrapper() uses the current
2344 # locale encoding and not the user preferred encoding
2345 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2346 if key in os.environ:
2347 del os.environ[key]
2348
2349 current_locale_encoding = locale.getpreferredencoding(False)
2350 b = self.BytesIO()
2351 t = self.TextIOWrapper(b)
2352 self.assertEqual(t.encoding, current_locale_encoding)
2353 finally:
2354 os.environ.clear()
2355 os.environ.update(old_environ)
2356
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002357 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002358 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002359 # Issue 15989
2360 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002361 b = self.BytesIO()
2362 b.fileno = lambda: _testcapi.INT_MAX + 1
2363 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2364 b.fileno = lambda: _testcapi.UINT_MAX + 1
2365 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2366
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002367 def test_encoding(self):
2368 # Check the encoding attribute is always set, and valid
2369 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002370 t = self.TextIOWrapper(b, encoding="utf-8")
2371 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002372 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002373 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002374 codecs.lookup(t.encoding)
2375
2376 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002377 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002378 b = self.BytesIO(b"abc\n\xff\n")
2379 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002380 self.assertRaises(UnicodeError, t.read)
2381 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002382 b = self.BytesIO(b"abc\n\xff\n")
2383 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002384 self.assertRaises(UnicodeError, t.read)
2385 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002386 b = self.BytesIO(b"abc\n\xff\n")
2387 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002388 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002389 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002390 b = self.BytesIO(b"abc\n\xff\n")
2391 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002392 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002393
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002394 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002395 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002396 b = self.BytesIO()
2397 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002398 self.assertRaises(UnicodeError, t.write, "\xff")
2399 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002400 b = self.BytesIO()
2401 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002402 self.assertRaises(UnicodeError, t.write, "\xff")
2403 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002404 b = self.BytesIO()
2405 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002406 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002407 t.write("abc\xffdef\n")
2408 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002409 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002410 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002411 b = self.BytesIO()
2412 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002413 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002414 t.write("abc\xffdef\n")
2415 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002416 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002417
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002418 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002419 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2420
2421 tests = [
2422 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002423 [ '', input_lines ],
2424 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2425 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2426 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002427 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002428 encodings = (
2429 'utf-8', 'latin-1',
2430 'utf-16', 'utf-16-le', 'utf-16-be',
2431 'utf-32', 'utf-32-le', 'utf-32-be',
2432 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002433
Guido van Rossum8358db22007-08-18 21:39:55 +00002434 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002435 # character in TextIOWrapper._pending_line.
2436 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002437 # XXX: str.encode() should return bytes
2438 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002439 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002440 for bufsize in range(1, 10):
2441 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002442 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2443 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002444 encoding=encoding)
2445 if do_reads:
2446 got_lines = []
2447 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002448 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002449 if c2 == '':
2450 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002451 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002452 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002453 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002454 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002455
2456 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002457 self.assertEqual(got_line, exp_line)
2458 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002459
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002460 def test_newlines_input(self):
2461 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002462 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2463 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002464 (None, normalized.decode("ascii").splitlines(keepends=True)),
2465 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002466 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2467 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2468 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002469 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002470 buf = self.BytesIO(testdata)
2471 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002472 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002473 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002474 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002475
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002476 def test_newlines_output(self):
2477 testdict = {
2478 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2479 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2480 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2481 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2482 }
2483 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2484 for newline, expected in tests:
2485 buf = self.BytesIO()
2486 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2487 txt.write("AAA\nB")
2488 txt.write("BB\nCCC\n")
2489 txt.write("X\rY\r\nZ")
2490 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002491 self.assertEqual(buf.closed, False)
2492 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002493
2494 def test_destructor(self):
2495 l = []
2496 base = self.BytesIO
2497 class MyBytesIO(base):
2498 def close(self):
2499 l.append(self.getvalue())
2500 base.close(self)
2501 b = MyBytesIO()
2502 t = self.TextIOWrapper(b, encoding="ascii")
2503 t.write("abc")
2504 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002505 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002506 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002507
2508 def test_override_destructor(self):
2509 record = []
2510 class MyTextIO(self.TextIOWrapper):
2511 def __del__(self):
2512 record.append(1)
2513 try:
2514 f = super().__del__
2515 except AttributeError:
2516 pass
2517 else:
2518 f()
2519 def close(self):
2520 record.append(2)
2521 super().close()
2522 def flush(self):
2523 record.append(3)
2524 super().flush()
2525 b = self.BytesIO()
2526 t = MyTextIO(b, encoding="ascii")
2527 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002528 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002529 self.assertEqual(record, [1, 2, 3])
2530
2531 def test_error_through_destructor(self):
2532 # Test that the exception state is not modified by a destructor,
2533 # even if close() fails.
2534 rawio = self.CloseFailureIO()
2535 def f():
2536 self.TextIOWrapper(rawio).xyzzy
2537 with support.captured_output("stderr") as s:
2538 self.assertRaises(AttributeError, f)
2539 s = s.getvalue().strip()
2540 if s:
2541 # The destructor *may* have printed an unraisable error, check it
2542 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002543 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002544 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002545
Guido van Rossum9b76da62007-04-11 01:09:03 +00002546 # Systematic tests of the text I/O API
2547
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002548 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002549 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002550 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002551 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002552 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002553 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002554 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002555 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002556 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002557 self.assertEqual(f.tell(), 0)
2558 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002559 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002560 self.assertEqual(f.seek(0), 0)
2561 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002562 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002563 self.assertEqual(f.read(2), "ab")
2564 self.assertEqual(f.read(1), "c")
2565 self.assertEqual(f.read(1), "")
2566 self.assertEqual(f.read(), "")
2567 self.assertEqual(f.tell(), cookie)
2568 self.assertEqual(f.seek(0), 0)
2569 self.assertEqual(f.seek(0, 2), cookie)
2570 self.assertEqual(f.write("def"), 3)
2571 self.assertEqual(f.seek(cookie), cookie)
2572 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002573 if enc.startswith("utf"):
2574 self.multi_line_test(f, enc)
2575 f.close()
2576
2577 def multi_line_test(self, f, enc):
2578 f.seek(0)
2579 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002580 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002581 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002582 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002583 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002584 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002585 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002586 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002587 wlines.append((f.tell(), line))
2588 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002589 f.seek(0)
2590 rlines = []
2591 while True:
2592 pos = f.tell()
2593 line = f.readline()
2594 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002595 break
2596 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002597 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002598
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002599 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002600 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002601 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002602 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002603 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002604 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002605 p2 = f.tell()
2606 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002607 self.assertEqual(f.tell(), p0)
2608 self.assertEqual(f.readline(), "\xff\n")
2609 self.assertEqual(f.tell(), p1)
2610 self.assertEqual(f.readline(), "\xff\n")
2611 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002612 f.seek(0)
2613 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002614 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002615 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002616 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002617 f.close()
2618
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002619 def test_seeking(self):
2620 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002621 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002622 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002623 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002624 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002625 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002626 suffix = bytes(u_suffix.encode("utf-8"))
2627 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002628 with self.open(support.TESTFN, "wb") as f:
2629 f.write(line*2)
2630 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2631 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002632 self.assertEqual(s, str(prefix, "ascii"))
2633 self.assertEqual(f.tell(), prefix_size)
2634 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002635
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002636 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002637 # Regression test for a specific bug
2638 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002639 with self.open(support.TESTFN, "wb") as f:
2640 f.write(data)
2641 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2642 f._CHUNK_SIZE # Just test that it exists
2643 f._CHUNK_SIZE = 2
2644 f.readline()
2645 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002646
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002647 def test_seek_and_tell(self):
2648 #Test seek/tell using the StatefulIncrementalDecoder.
2649 # Make test faster by doing smaller seeks
2650 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002651
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002652 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002653 """Tell/seek to various points within a data stream and ensure
2654 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002655 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002656 f.write(data)
2657 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002658 f = self.open(support.TESTFN, encoding='test_decoder')
2659 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002660 decoded = f.read()
2661 f.close()
2662
Neal Norwitze2b07052008-03-18 19:52:05 +00002663 for i in range(min_pos, len(decoded) + 1): # seek positions
2664 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002665 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002666 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002667 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002668 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002669 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002670 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002671 f.close()
2672
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002673 # Enable the test decoder.
2674 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002675
2676 # Run the tests.
2677 try:
2678 # Try each test case.
2679 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002680 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002681
2682 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002683 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2684 offset = CHUNK_SIZE - len(input)//2
2685 prefix = b'.'*offset
2686 # Don't bother seeking into the prefix (takes too long).
2687 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002688 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002689
2690 # Ensure our test decoder won't interfere with subsequent tests.
2691 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002692 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002693
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002694 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002695 data = "1234567890"
2696 tests = ("utf-16",
2697 "utf-16-le",
2698 "utf-16-be",
2699 "utf-32",
2700 "utf-32-le",
2701 "utf-32-be")
2702 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002703 buf = self.BytesIO()
2704 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002705 # Check if the BOM is written only once (see issue1753).
2706 f.write(data)
2707 f.write(data)
2708 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002709 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002710 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002711 self.assertEqual(f.read(), data * 2)
2712 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002713
Benjamin Petersona1b49012009-03-31 23:11:32 +00002714 def test_unreadable(self):
2715 class UnReadable(self.BytesIO):
2716 def readable(self):
2717 return False
2718 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002719 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002720
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002721 def test_read_one_by_one(self):
2722 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002723 reads = ""
2724 while True:
2725 c = txt.read(1)
2726 if not c:
2727 break
2728 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002729 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002730
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002731 def test_readlines(self):
2732 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2733 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2734 txt.seek(0)
2735 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2736 txt.seek(0)
2737 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2738
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002739 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002740 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002741 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002742 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002743 reads = ""
2744 while True:
2745 c = txt.read(128)
2746 if not c:
2747 break
2748 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002749 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002750
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002751 def test_writelines(self):
2752 l = ['ab', 'cd', 'ef']
2753 buf = self.BytesIO()
2754 txt = self.TextIOWrapper(buf)
2755 txt.writelines(l)
2756 txt.flush()
2757 self.assertEqual(buf.getvalue(), b'abcdef')
2758
2759 def test_writelines_userlist(self):
2760 l = UserList(['ab', 'cd', 'ef'])
2761 buf = self.BytesIO()
2762 txt = self.TextIOWrapper(buf)
2763 txt.writelines(l)
2764 txt.flush()
2765 self.assertEqual(buf.getvalue(), b'abcdef')
2766
2767 def test_writelines_error(self):
2768 txt = self.TextIOWrapper(self.BytesIO())
2769 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2770 self.assertRaises(TypeError, txt.writelines, None)
2771 self.assertRaises(TypeError, txt.writelines, b'abc')
2772
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002773 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002774 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002775
2776 # read one char at a time
2777 reads = ""
2778 while True:
2779 c = txt.read(1)
2780 if not c:
2781 break
2782 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002783 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002784
2785 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002786 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002787 txt._CHUNK_SIZE = 4
2788
2789 reads = ""
2790 while True:
2791 c = txt.read(4)
2792 if not c:
2793 break
2794 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002795 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002796
2797 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002798 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002799 txt._CHUNK_SIZE = 4
2800
2801 reads = txt.read(4)
2802 reads += txt.read(4)
2803 reads += txt.readline()
2804 reads += txt.readline()
2805 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002806 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002807
2808 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002809 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002810 txt._CHUNK_SIZE = 4
2811
2812 reads = txt.read(4)
2813 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002814 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002815
2816 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002817 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002818 txt._CHUNK_SIZE = 4
2819
2820 reads = txt.read(4)
2821 pos = txt.tell()
2822 txt.seek(0)
2823 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002824 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002825
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002826 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002827 buffer = self.BytesIO(self.testdata)
2828 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002829
2830 self.assertEqual(buffer.seekable(), txt.seekable())
2831
Antoine Pitroue4501852009-05-14 18:55:55 +00002832 def test_append_bom(self):
2833 # The BOM is not written again when appending to a non-empty file
2834 filename = support.TESTFN
2835 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2836 with self.open(filename, 'w', encoding=charset) as f:
2837 f.write('aaa')
2838 pos = f.tell()
2839 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002840 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002841
2842 with self.open(filename, 'a', encoding=charset) as f:
2843 f.write('xxx')
2844 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002845 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002846
2847 def test_seek_bom(self):
2848 # Same test, but when seeking manually
2849 filename = support.TESTFN
2850 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2851 with self.open(filename, 'w', encoding=charset) as f:
2852 f.write('aaa')
2853 pos = f.tell()
2854 with self.open(filename, 'r+', encoding=charset) as f:
2855 f.seek(pos)
2856 f.write('zzz')
2857 f.seek(0)
2858 f.write('bbb')
2859 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002860 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002861
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02002862 def test_seek_append_bom(self):
2863 # Same test, but first seek to the start and then to the end
2864 filename = support.TESTFN
2865 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2866 with self.open(filename, 'w', encoding=charset) as f:
2867 f.write('aaa')
2868 with self.open(filename, 'a', encoding=charset) as f:
2869 f.seek(0)
2870 f.seek(0, self.SEEK_END)
2871 f.write('xxx')
2872 with self.open(filename, 'rb') as f:
2873 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
2874
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002875 def test_errors_property(self):
2876 with self.open(support.TESTFN, "w") as f:
2877 self.assertEqual(f.errors, "strict")
2878 with self.open(support.TESTFN, "w", errors="replace") as f:
2879 self.assertEqual(f.errors, "replace")
2880
Brett Cannon31f59292011-02-21 19:29:56 +00002881 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002882 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002883 def test_threads_write(self):
2884 # Issue6750: concurrent writes could duplicate data
2885 event = threading.Event()
2886 with self.open(support.TESTFN, "w", buffering=1) as f:
2887 def run(n):
2888 text = "Thread%03d\n" % n
2889 event.wait()
2890 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002891 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002892 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002893 with support.start_threads(threads, event.set):
2894 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002895 with self.open(support.TESTFN) as f:
2896 content = f.read()
2897 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002898 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002899
Antoine Pitrou6be88762010-05-03 16:48:20 +00002900 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002901 # Test that text file is closed despite failed flush
2902 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00002903 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002904 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00002905 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002906 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002907 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002908 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002909 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002910 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002911 self.assertTrue(txt.buffer.closed)
2912 self.assertTrue(closed) # flush() called
2913 self.assertFalse(closed[0]) # flush() called before file closed
2914 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02002915 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00002916
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03002917 def test_close_error_on_close(self):
2918 buffer = self.BytesIO(self.testdata)
2919 def bad_flush():
2920 raise OSError('flush')
2921 def bad_close():
2922 raise OSError('close')
2923 buffer.close = bad_close
2924 txt = self.TextIOWrapper(buffer, encoding="ascii")
2925 txt.flush = bad_flush
2926 with self.assertRaises(OSError) as err: # exception not swallowed
2927 txt.close()
2928 self.assertEqual(err.exception.args, ('close',))
2929 self.assertIsInstance(err.exception.__context__, OSError)
2930 self.assertEqual(err.exception.__context__.args, ('flush',))
2931 self.assertFalse(txt.closed)
2932
2933 def test_nonnormalized_close_error_on_close(self):
2934 # Issue #21677
2935 buffer = self.BytesIO(self.testdata)
2936 def bad_flush():
2937 raise non_existing_flush
2938 def bad_close():
2939 raise non_existing_close
2940 buffer.close = bad_close
2941 txt = self.TextIOWrapper(buffer, encoding="ascii")
2942 txt.flush = bad_flush
2943 with self.assertRaises(NameError) as err: # exception not swallowed
2944 txt.close()
2945 self.assertIn('non_existing_close', str(err.exception))
2946 self.assertIsInstance(err.exception.__context__, NameError)
2947 self.assertIn('non_existing_flush', str(err.exception.__context__))
2948 self.assertFalse(txt.closed)
2949
Antoine Pitrou6be88762010-05-03 16:48:20 +00002950 def test_multi_close(self):
2951 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2952 txt.close()
2953 txt.close()
2954 txt.close()
2955 self.assertRaises(ValueError, txt.flush)
2956
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002957 def test_unseekable(self):
2958 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2959 self.assertRaises(self.UnsupportedOperation, txt.tell)
2960 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2961
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002962 def test_readonly_attributes(self):
2963 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2964 buf = self.BytesIO(self.testdata)
2965 with self.assertRaises(AttributeError):
2966 txt.buffer = buf
2967
Antoine Pitroue96ec682011-07-23 21:46:35 +02002968 def test_rawio(self):
2969 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2970 # that subprocess.Popen() can have the required unbuffered
2971 # semantics with universal_newlines=True.
2972 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2973 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2974 # Reads
2975 self.assertEqual(txt.read(4), 'abcd')
2976 self.assertEqual(txt.readline(), 'efghi\n')
2977 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2978
2979 def test_rawio_write_through(self):
2980 # Issue #12591: with write_through=True, writes don't need a flush
2981 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2982 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2983 write_through=True)
2984 txt.write('1')
2985 txt.write('23\n4')
2986 txt.write('5')
2987 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2988
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02002989 def test_bufio_write_through(self):
2990 # Issue #21396: write_through=True doesn't force a flush()
2991 # on the underlying binary buffered object.
2992 flush_called, write_called = [], []
2993 class BufferedWriter(self.BufferedWriter):
2994 def flush(self, *args, **kwargs):
2995 flush_called.append(True)
2996 return super().flush(*args, **kwargs)
2997 def write(self, *args, **kwargs):
2998 write_called.append(True)
2999 return super().write(*args, **kwargs)
3000
3001 rawio = self.BytesIO()
3002 data = b"a"
3003 bufio = BufferedWriter(rawio, len(data)*2)
3004 textio = self.TextIOWrapper(bufio, encoding='ascii',
3005 write_through=True)
3006 # write to the buffered io but don't overflow the buffer
3007 text = data.decode('ascii')
3008 textio.write(text)
3009
3010 # buffer.flush is not called with write_through=True
3011 self.assertFalse(flush_called)
3012 # buffer.write *is* called with write_through=True
3013 self.assertTrue(write_called)
3014 self.assertEqual(rawio.getvalue(), b"") # no flush
3015
3016 write_called = [] # reset
3017 textio.write(text * 10) # total content is larger than bufio buffer
3018 self.assertTrue(write_called)
3019 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3020
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003021 def test_read_nonbytes(self):
3022 # Issue #17106
3023 # Crash when underlying read() returns non-bytes
3024 t = self.TextIOWrapper(self.StringIO('a'))
3025 self.assertRaises(TypeError, t.read, 1)
3026 t = self.TextIOWrapper(self.StringIO('a'))
3027 self.assertRaises(TypeError, t.readline)
3028 t = self.TextIOWrapper(self.StringIO('a'))
3029 self.assertRaises(TypeError, t.read)
3030
3031 def test_illegal_decoder(self):
3032 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003033 # Bypass the early encoding check added in issue 20404
3034 def _make_illegal_wrapper():
3035 quopri = codecs.lookup("quopri")
3036 quopri._is_text_encoding = True
3037 try:
3038 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3039 newline='\n', encoding="quopri")
3040 finally:
3041 quopri._is_text_encoding = False
3042 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003043 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003044 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003045 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003046 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003047 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003048 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003049 self.assertRaises(TypeError, t.read)
3050
Antoine Pitrou712cb732013-12-21 15:51:54 +01003051 def _check_create_at_shutdown(self, **kwargs):
3052 # Issue #20037: creating a TextIOWrapper at shutdown
3053 # shouldn't crash the interpreter.
3054 iomod = self.io.__name__
3055 code = """if 1:
3056 import codecs
3057 import {iomod} as io
3058
3059 # Avoid looking up codecs at shutdown
3060 codecs.lookup('utf-8')
3061
3062 class C:
3063 def __init__(self):
3064 self.buf = io.BytesIO()
3065 def __del__(self):
3066 io.TextIOWrapper(self.buf, **{kwargs})
3067 print("ok")
3068 c = C()
3069 """.format(iomod=iomod, kwargs=kwargs)
3070 return assert_python_ok("-c", code)
3071
3072 def test_create_at_shutdown_without_encoding(self):
3073 rc, out, err = self._check_create_at_shutdown()
3074 if err:
3075 # Can error out with a RuntimeError if the module state
3076 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003077 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003078 else:
3079 self.assertEqual("ok", out.decode().strip())
3080
3081 def test_create_at_shutdown_with_encoding(self):
3082 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3083 errors='strict')
3084 self.assertFalse(err)
3085 self.assertEqual("ok", out.decode().strip())
3086
Antoine Pitroub8503892014-04-29 10:14:02 +02003087 def test_read_byteslike(self):
3088 r = MemviewBytesIO(b'Just some random string\n')
3089 t = self.TextIOWrapper(r, 'utf-8')
3090
3091 # TextIOwrapper will not read the full string, because
3092 # we truncate it to a multiple of the native int size
3093 # so that we can construct a more complex memoryview.
3094 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3095
3096 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3097
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003098 def test_issue22849(self):
3099 class F(object):
3100 def readable(self): return True
3101 def writable(self): return True
3102 def seekable(self): return True
3103
3104 for i in range(10):
3105 try:
3106 self.TextIOWrapper(F(), encoding='utf-8')
3107 except Exception:
3108 pass
3109
3110 F.tell = lambda x: 0
3111 t = self.TextIOWrapper(F(), encoding='utf-8')
3112
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003113
Antoine Pitroub8503892014-04-29 10:14:02 +02003114class MemviewBytesIO(io.BytesIO):
3115 '''A BytesIO object whose read method returns memoryviews
3116 rather than bytes'''
3117
3118 def read1(self, len_):
3119 return _to_memoryview(super().read1(len_))
3120
3121 def read(self, len_):
3122 return _to_memoryview(super().read(len_))
3123
3124def _to_memoryview(buf):
3125 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3126
3127 arr = array.array('i')
3128 idx = len(buf) - len(buf) % arr.itemsize
3129 arr.frombytes(buf[:idx])
3130 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003131
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003133class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003134 io = io
Nick Coghlana9b15242014-02-04 22:11:18 +10003135 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003136
3137 def test_initialization(self):
3138 r = self.BytesIO(b"\xc3\xa9\n\n")
3139 b = self.BufferedReader(r, 1000)
3140 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003141 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3142 self.assertRaises(ValueError, t.read)
3143
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003144 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3145 self.assertRaises(Exception, repr, t)
3146
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003147 def test_garbage_collection(self):
3148 # C TextIOWrapper objects are collected, and collecting them flushes
3149 # all data to disk.
3150 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003151 with support.check_warnings(('', ResourceWarning)):
3152 rawio = io.FileIO(support.TESTFN, "wb")
3153 b = self.BufferedWriter(rawio)
3154 t = self.TextIOWrapper(b, encoding="ascii")
3155 t.write("456def")
3156 t.x = t
3157 wr = weakref.ref(t)
3158 del t
3159 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003160 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003161 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003162 self.assertEqual(f.read(), b"456def")
3163
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003164 def test_rwpair_cleared_before_textio(self):
3165 # Issue 13070: TextIOWrapper's finalization would crash when called
3166 # after the reference to the underlying BufferedRWPair's writer got
3167 # cleared by the GC.
3168 for i in range(1000):
3169 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3170 t1 = self.TextIOWrapper(b1, encoding="ascii")
3171 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3172 t2 = self.TextIOWrapper(b2, encoding="ascii")
3173 # circular references
3174 t1.buddy = t2
3175 t2.buddy = t1
3176 support.gc_collect()
3177
3178
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003179class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003180 io = pyio
Serhiy Storchakad667d722014-02-10 19:09:19 +02003181 #shutdown_error = "LookupError: unknown encoding: ascii"
3182 shutdown_error = "TypeError: 'NoneType' object is not iterable"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003183
3184
3185class IncrementalNewlineDecoderTest(unittest.TestCase):
3186
3187 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003188 # UTF-8 specific tests for a newline decoder
3189 def _check_decode(b, s, **kwargs):
3190 # We exercise getstate() / setstate() as well as decode()
3191 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003192 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003193 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003194 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003195
Antoine Pitrou180a3362008-12-14 16:36:46 +00003196 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003197
Antoine Pitrou180a3362008-12-14 16:36:46 +00003198 _check_decode(b'\xe8', "")
3199 _check_decode(b'\xa2', "")
3200 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003201
Antoine Pitrou180a3362008-12-14 16:36:46 +00003202 _check_decode(b'\xe8', "")
3203 _check_decode(b'\xa2', "")
3204 _check_decode(b'\x88', "\u8888")
3205
3206 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003207 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3208
Antoine Pitrou180a3362008-12-14 16:36:46 +00003209 decoder.reset()
3210 _check_decode(b'\n', "\n")
3211 _check_decode(b'\r', "")
3212 _check_decode(b'', "\n", final=True)
3213 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003214
Antoine Pitrou180a3362008-12-14 16:36:46 +00003215 _check_decode(b'\r', "")
3216 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003217
Antoine Pitrou180a3362008-12-14 16:36:46 +00003218 _check_decode(b'\r\r\n', "\n\n")
3219 _check_decode(b'\r', "")
3220 _check_decode(b'\r', "\n")
3221 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003222
Antoine Pitrou180a3362008-12-14 16:36:46 +00003223 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3224 _check_decode(b'\xe8\xa2\x88', "\u8888")
3225 _check_decode(b'\n', "\n")
3226 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3227 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003228
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003229 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003230 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003231 if encoding is not None:
3232 encoder = codecs.getincrementalencoder(encoding)()
3233 def _decode_bytewise(s):
3234 # Decode one byte at a time
3235 for b in encoder.encode(s):
3236 result.append(decoder.decode(bytes([b])))
3237 else:
3238 encoder = None
3239 def _decode_bytewise(s):
3240 # Decode one char at a time
3241 for c in s:
3242 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003243 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003244 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003245 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003246 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003247 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003248 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003249 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003250 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003251 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003252 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003253 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003254 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003255 input = "abc"
3256 if encoder is not None:
3257 encoder.reset()
3258 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003259 self.assertEqual(decoder.decode(input), "abc")
3260 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003261
3262 def test_newline_decoder(self):
3263 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003264 # None meaning the IncrementalNewlineDecoder takes unicode input
3265 # rather than bytes input
3266 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003267 'utf-16', 'utf-16-le', 'utf-16-be',
3268 'utf-32', 'utf-32-le', 'utf-32-be',
3269 )
3270 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003271 decoder = enc and codecs.getincrementaldecoder(enc)()
3272 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3273 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003274 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003275 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3276 self.check_newline_decoding_utf8(decoder)
3277
Antoine Pitrou66913e22009-03-06 23:40:56 +00003278 def test_newline_bytes(self):
3279 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3280 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003281 self.assertEqual(dec.newlines, None)
3282 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3283 self.assertEqual(dec.newlines, None)
3284 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3285 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003286 dec = self.IncrementalNewlineDecoder(None, translate=False)
3287 _check(dec)
3288 dec = self.IncrementalNewlineDecoder(None, translate=True)
3289 _check(dec)
3290
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003291class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3292 pass
3293
3294class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3295 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003296
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003297
Guido van Rossum01a27522007-03-07 01:00:12 +00003298# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003299
Guido van Rossum5abbf752007-08-27 17:39:33 +00003300class MiscIOTest(unittest.TestCase):
3301
Barry Warsaw40e82462008-11-20 20:14:50 +00003302 def tearDown(self):
3303 support.unlink(support.TESTFN)
3304
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003305 def test___all__(self):
3306 for name in self.io.__all__:
3307 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003308 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003309 if name == "open":
3310 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003311 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003312 self.assertTrue(issubclass(obj, Exception), name)
3313 elif not name.startswith("SEEK_"):
3314 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003315
Barry Warsaw40e82462008-11-20 20:14:50 +00003316 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003317 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003318 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003319 f.close()
3320
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003321 with support.check_warnings(('', DeprecationWarning)):
3322 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003323 self.assertEqual(f.name, support.TESTFN)
3324 self.assertEqual(f.buffer.name, support.TESTFN)
3325 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3326 self.assertEqual(f.mode, "U")
3327 self.assertEqual(f.buffer.mode, "rb")
3328 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003329 f.close()
3330
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003331 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003332 self.assertEqual(f.mode, "w+")
3333 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3334 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003335
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003336 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003337 self.assertEqual(g.mode, "wb")
3338 self.assertEqual(g.raw.mode, "wb")
3339 self.assertEqual(g.name, f.fileno())
3340 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003341 f.close()
3342 g.close()
3343
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003344 def test_io_after_close(self):
3345 for kwargs in [
3346 {"mode": "w"},
3347 {"mode": "wb"},
3348 {"mode": "w", "buffering": 1},
3349 {"mode": "w", "buffering": 2},
3350 {"mode": "wb", "buffering": 0},
3351 {"mode": "r"},
3352 {"mode": "rb"},
3353 {"mode": "r", "buffering": 1},
3354 {"mode": "r", "buffering": 2},
3355 {"mode": "rb", "buffering": 0},
3356 {"mode": "w+"},
3357 {"mode": "w+b"},
3358 {"mode": "w+", "buffering": 1},
3359 {"mode": "w+", "buffering": 2},
3360 {"mode": "w+b", "buffering": 0},
3361 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003362 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003363 f.close()
3364 self.assertRaises(ValueError, f.flush)
3365 self.assertRaises(ValueError, f.fileno)
3366 self.assertRaises(ValueError, f.isatty)
3367 self.assertRaises(ValueError, f.__iter__)
3368 if hasattr(f, "peek"):
3369 self.assertRaises(ValueError, f.peek, 1)
3370 self.assertRaises(ValueError, f.read)
3371 if hasattr(f, "read1"):
3372 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003373 if hasattr(f, "readall"):
3374 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003375 if hasattr(f, "readinto"):
3376 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003377 if hasattr(f, "readinto1"):
3378 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003379 self.assertRaises(ValueError, f.readline)
3380 self.assertRaises(ValueError, f.readlines)
3381 self.assertRaises(ValueError, f.seek, 0)
3382 self.assertRaises(ValueError, f.tell)
3383 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003384 self.assertRaises(ValueError, f.write,
3385 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003386 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003387 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003388
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003389 def test_blockingioerror(self):
3390 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003391 class C(str):
3392 pass
3393 c = C("")
3394 b = self.BlockingIOError(1, c)
3395 c.b = b
3396 b.c = c
3397 wr = weakref.ref(c)
3398 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003399 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003400 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003401
3402 def test_abcs(self):
3403 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003404 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3405 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3406 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3407 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003408
3409 def _check_abc_inheritance(self, abcmodule):
3410 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003411 self.assertIsInstance(f, abcmodule.IOBase)
3412 self.assertIsInstance(f, abcmodule.RawIOBase)
3413 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3414 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003415 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003416 self.assertIsInstance(f, abcmodule.IOBase)
3417 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3418 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3419 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003420 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003421 self.assertIsInstance(f, abcmodule.IOBase)
3422 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3423 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3424 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003425
3426 def test_abc_inheritance(self):
3427 # Test implementations inherit from their respective ABCs
3428 self._check_abc_inheritance(self)
3429
3430 def test_abc_inheritance_official(self):
3431 # Test implementations inherit from the official ABCs of the
3432 # baseline "io" module.
3433 self._check_abc_inheritance(io)
3434
Antoine Pitroue033e062010-10-29 10:38:18 +00003435 def _check_warn_on_dealloc(self, *args, **kwargs):
3436 f = open(*args, **kwargs)
3437 r = repr(f)
3438 with self.assertWarns(ResourceWarning) as cm:
3439 f = None
3440 support.gc_collect()
3441 self.assertIn(r, str(cm.warning.args[0]))
3442
3443 def test_warn_on_dealloc(self):
3444 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3445 self._check_warn_on_dealloc(support.TESTFN, "wb")
3446 self._check_warn_on_dealloc(support.TESTFN, "w")
3447
3448 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3449 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003450 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003451 for fd in fds:
3452 try:
3453 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003454 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003455 if e.errno != errno.EBADF:
3456 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003457 self.addCleanup(cleanup_fds)
3458 r, w = os.pipe()
3459 fds += r, w
3460 self._check_warn_on_dealloc(r, *args, **kwargs)
3461 # When using closefd=False, there's no warning
3462 r, w = os.pipe()
3463 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02003464 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00003465 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00003466
3467 def test_warn_on_dealloc_fd(self):
3468 self._check_warn_on_dealloc_fd("rb", buffering=0)
3469 self._check_warn_on_dealloc_fd("rb")
3470 self._check_warn_on_dealloc_fd("r")
3471
3472
Antoine Pitrou243757e2010-11-05 21:15:39 +00003473 def test_pickling(self):
3474 # Pickling file objects is forbidden
3475 for kwargs in [
3476 {"mode": "w"},
3477 {"mode": "wb"},
3478 {"mode": "wb", "buffering": 0},
3479 {"mode": "r"},
3480 {"mode": "rb"},
3481 {"mode": "rb", "buffering": 0},
3482 {"mode": "w+"},
3483 {"mode": "w+b"},
3484 {"mode": "w+b", "buffering": 0},
3485 ]:
3486 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3487 with self.open(support.TESTFN, **kwargs) as f:
3488 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3489
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003490 def test_nonblock_pipe_write_bigbuf(self):
3491 self._test_nonblock_pipe_write(16*1024)
3492
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003493 def test_nonblock_pipe_write_smallbuf(self):
3494 self._test_nonblock_pipe_write(1024)
3495
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003496 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3497 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003498 def _test_nonblock_pipe_write(self, bufsize):
3499 sent = []
3500 received = []
3501 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003502 os.set_blocking(r, False)
3503 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003504
3505 # To exercise all code paths in the C implementation we need
3506 # to play with buffer sizes. For instance, if we choose a
3507 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3508 # then we will never get a partial write of the buffer.
3509 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3510 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3511
3512 with rf, wf:
3513 for N in 9999, 73, 7574:
3514 try:
3515 i = 0
3516 while True:
3517 msg = bytes([i % 26 + 97]) * N
3518 sent.append(msg)
3519 wf.write(msg)
3520 i += 1
3521
3522 except self.BlockingIOError as e:
3523 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003524 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003525 sent[-1] = sent[-1][:e.characters_written]
3526 received.append(rf.read())
3527 msg = b'BLOCKED'
3528 wf.write(msg)
3529 sent.append(msg)
3530
3531 while True:
3532 try:
3533 wf.flush()
3534 break
3535 except self.BlockingIOError as e:
3536 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003537 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003538 self.assertEqual(e.characters_written, 0)
3539 received.append(rf.read())
3540
3541 received += iter(rf.read, None)
3542
3543 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003544 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003545 self.assertTrue(wf.closed)
3546 self.assertTrue(rf.closed)
3547
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003548 def test_create_fail(self):
3549 # 'x' mode fails if file is existing
3550 with self.open(support.TESTFN, 'w'):
3551 pass
3552 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3553
3554 def test_create_writes(self):
3555 # 'x' mode opens for writing
3556 with self.open(support.TESTFN, 'xb') as f:
3557 f.write(b"spam")
3558 with self.open(support.TESTFN, 'rb') as f:
3559 self.assertEqual(b"spam", f.read())
3560
Christian Heimes7b648752012-09-10 14:48:43 +02003561 def test_open_allargs(self):
3562 # there used to be a buffer overflow in the parser for rawmode
3563 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3564
3565
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003566class CMiscIOTest(MiscIOTest):
3567 io = io
3568
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003569 def test_readinto_buffer_overflow(self):
3570 # Issue #18025
3571 class BadReader(self.io.BufferedIOBase):
3572 def read(self, n=-1):
3573 return b'x' * 10**6
3574 bufio = BadReader()
3575 b = bytearray(2)
3576 self.assertRaises(ValueError, bufio.readinto, b)
3577
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003578 @unittest.skipUnless(threading, 'Threading required for this test.')
3579 def check_daemon_threads_shutdown_deadlock(self, stream_name):
3580 # Issue #23309: deadlocks at shutdown should be avoided when a
3581 # daemon thread and the main thread both write to a file.
3582 code = """if 1:
3583 import sys
3584 import time
3585 import threading
3586
3587 file = sys.{stream_name}
3588
3589 def run():
3590 while True:
3591 file.write('.')
3592 file.flush()
3593
3594 thread = threading.Thread(target=run)
3595 thread.daemon = True
3596 thread.start()
3597
3598 time.sleep(0.5)
3599 file.write('!')
3600 file.flush()
3601 """.format_map(locals())
3602 res, _ = run_python_until_end("-c", code)
3603 err = res.err.decode()
3604 if res.rc != 0:
3605 # Failure: should be a fatal error
3606 self.assertIn("Fatal Python error: could not acquire lock "
3607 "for <_io.BufferedWriter name='<{stream_name}>'> "
3608 "at interpreter shutdown, possibly due to "
3609 "daemon threads".format_map(locals()),
3610 err)
3611 else:
3612 self.assertFalse(err.strip('.!'))
3613
3614 def test_daemon_threads_shutdown_stdout_deadlock(self):
3615 self.check_daemon_threads_shutdown_deadlock('stdout')
3616
3617 def test_daemon_threads_shutdown_stderr_deadlock(self):
3618 self.check_daemon_threads_shutdown_deadlock('stderr')
3619
3620
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003621class PyMiscIOTest(MiscIOTest):
3622 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003623
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003624
3625@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3626class SignalsTest(unittest.TestCase):
3627
3628 def setUp(self):
3629 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3630
3631 def tearDown(self):
3632 signal.signal(signal.SIGALRM, self.oldalrm)
3633
3634 def alarm_interrupt(self, sig, frame):
3635 1/0
3636
3637 @unittest.skipUnless(threading, 'Threading required for this test.')
3638 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3639 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003640 invokes the signal handler, and bubbles up the exception raised
3641 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003642 read_results = []
3643 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003644 if hasattr(signal, 'pthread_sigmask'):
3645 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003646 s = os.read(r, 1)
3647 read_results.append(s)
3648 t = threading.Thread(target=_read)
3649 t.daemon = True
3650 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003651 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01003652 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003653 try:
3654 wio = self.io.open(w, **fdopen_kwargs)
3655 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003656 # Fill the pipe enough that the write will be blocking.
3657 # It will be interrupted by the timer armed above. Since the
3658 # other thread has read one byte, the low-level write will
3659 # return with a successful (partial) result rather than an EINTR.
3660 # The buffered IO layer must check for pending signal
3661 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003662 signal.alarm(1)
3663 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01003664 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02003665 finally:
3666 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003667 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003668 # We got one byte, get another one and check that it isn't a
3669 # repeat of the first one.
3670 read_results.append(os.read(r, 1))
3671 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3672 finally:
3673 os.close(w)
3674 os.close(r)
3675 # This is deliberate. If we didn't close the file descriptor
3676 # before closing wio, wio would try to flush its internal
3677 # buffer, and block again.
3678 try:
3679 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003680 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003681 if e.errno != errno.EBADF:
3682 raise
3683
3684 def test_interrupted_write_unbuffered(self):
3685 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3686
3687 def test_interrupted_write_buffered(self):
3688 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3689
Victor Stinner6ab72862014-09-03 23:32:28 +02003690 # Issue #22331: The test hangs on FreeBSD 7.2
3691 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003692 def test_interrupted_write_text(self):
3693 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3694
Brett Cannon31f59292011-02-21 19:29:56 +00003695 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003696 def check_reentrant_write(self, data, **fdopen_kwargs):
3697 def on_alarm(*args):
3698 # Will be called reentrantly from the same thread
3699 wio.write(data)
3700 1/0
3701 signal.signal(signal.SIGALRM, on_alarm)
3702 r, w = os.pipe()
3703 wio = self.io.open(w, **fdopen_kwargs)
3704 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003705 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003706 # Either the reentrant call to wio.write() fails with RuntimeError,
3707 # or the signal handler raises ZeroDivisionError.
3708 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3709 while 1:
3710 for i in range(100):
3711 wio.write(data)
3712 wio.flush()
3713 # Make sure the buffer doesn't fill up and block further writes
3714 os.read(r, len(data) * 100)
3715 exc = cm.exception
3716 if isinstance(exc, RuntimeError):
3717 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3718 finally:
3719 wio.close()
3720 os.close(r)
3721
3722 def test_reentrant_write_buffered(self):
3723 self.check_reentrant_write(b"xy", mode="wb")
3724
3725 def test_reentrant_write_text(self):
3726 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3727
Antoine Pitrou707ce822011-02-25 21:24:11 +00003728 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3729 """Check that a buffered read, when it gets interrupted (either
3730 returning a partial result or EINTR), properly invokes the signal
3731 handler and retries if the latter returned successfully."""
3732 r, w = os.pipe()
3733 fdopen_kwargs["closefd"] = False
3734 def alarm_handler(sig, frame):
3735 os.write(w, b"bar")
3736 signal.signal(signal.SIGALRM, alarm_handler)
3737 try:
3738 rio = self.io.open(r, **fdopen_kwargs)
3739 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003740 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003741 # Expected behaviour:
3742 # - first raw read() returns partial b"foo"
3743 # - second raw read() returns EINTR
3744 # - third raw read() returns b"bar"
3745 self.assertEqual(decode(rio.read(6)), "foobar")
3746 finally:
3747 rio.close()
3748 os.close(w)
3749 os.close(r)
3750
Antoine Pitrou20db5112011-08-19 20:32:34 +02003751 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003752 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3753 mode="rb")
3754
Antoine Pitrou20db5112011-08-19 20:32:34 +02003755 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003756 self.check_interrupted_read_retry(lambda x: x,
3757 mode="r")
3758
3759 @unittest.skipUnless(threading, 'Threading required for this test.')
3760 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3761 """Check that a buffered write, when it gets interrupted (either
3762 returning a partial result or EINTR), properly invokes the signal
3763 handler and retries if the latter returned successfully."""
3764 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003765
Antoine Pitrou707ce822011-02-25 21:24:11 +00003766 # A quantity that exceeds the buffer size of an anonymous pipe's
3767 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003768 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003769 r, w = os.pipe()
3770 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003771
Antoine Pitrou707ce822011-02-25 21:24:11 +00003772 # We need a separate thread to read from the pipe and allow the
3773 # write() to finish. This thread is started after the SIGALRM is
3774 # received (forcing a first EINTR in write()).
3775 read_results = []
3776 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003777 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00003778 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003779 try:
3780 while not write_finished:
3781 while r in select.select([r], [], [], 1.0)[0]:
3782 s = os.read(r, 1024)
3783 read_results.append(s)
3784 except BaseException as exc:
3785 nonlocal error
3786 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00003787 t = threading.Thread(target=_read)
3788 t.daemon = True
3789 def alarm1(sig, frame):
3790 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003791 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003792 def alarm2(sig, frame):
3793 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003794
3795 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00003796 signal.signal(signal.SIGALRM, alarm1)
3797 try:
3798 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003799 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003800 # Expected behaviour:
3801 # - first raw write() is partial (because of the limited pipe buffer
3802 # and the first alarm)
3803 # - second raw write() returns EINTR (because of the second alarm)
3804 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003805 written = wio.write(large_data)
3806 self.assertEqual(N, written)
3807
Antoine Pitrou707ce822011-02-25 21:24:11 +00003808 wio.flush()
3809 write_finished = True
3810 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003811
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003812 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003813 self.assertEqual(N, sum(len(x) for x in read_results))
3814 finally:
3815 write_finished = True
3816 os.close(w)
3817 os.close(r)
3818 # This is deliberate. If we didn't close the file descriptor
3819 # before closing wio, wio would try to flush its internal
3820 # buffer, and could block (in case of failure).
3821 try:
3822 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003823 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003824 if e.errno != errno.EBADF:
3825 raise
3826
Antoine Pitrou20db5112011-08-19 20:32:34 +02003827 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003828 self.check_interrupted_write_retry(b"x", mode="wb")
3829
Antoine Pitrou20db5112011-08-19 20:32:34 +02003830 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003831 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3832
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003833
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003834class CSignalsTest(SignalsTest):
3835 io = io
3836
3837class PySignalsTest(SignalsTest):
3838 io = pyio
3839
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003840 # Handling reentrancy issues would slow down _pyio even more, so the
3841 # tests are disabled.
3842 test_reentrant_write_buffered = None
3843 test_reentrant_write_text = None
3844
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003845
Ezio Melottidaa42c72013-03-23 16:30:16 +02003846def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07003847 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003848 CBufferedReaderTest, PyBufferedReaderTest,
3849 CBufferedWriterTest, PyBufferedWriterTest,
3850 CBufferedRWPairTest, PyBufferedRWPairTest,
3851 CBufferedRandomTest, PyBufferedRandomTest,
3852 StatefulIncrementalDecoderTest,
3853 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3854 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003855 CMiscIOTest, PyMiscIOTest,
3856 CSignalsTest, PySignalsTest,
3857 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003858
3859 # Put the namespaces of the IO module we are testing and some useful mock
3860 # classes in the __dict__ of each test.
3861 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003862 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003863 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3864 c_io_ns = {name : getattr(io, name) for name in all_members}
3865 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3866 globs = globals()
3867 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3868 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3869 # Avoid turning open into a bound method.
3870 py_io_ns["open"] = pyio.OpenWrapper
3871 for test in tests:
3872 if test.__name__.startswith("C"):
3873 for name, obj in c_io_ns.items():
3874 setattr(test, name, obj)
3875 elif test.__name__.startswith("Py"):
3876 for name, obj in py_io_ns.items():
3877 setattr(test, name, obj)
3878
Ezio Melottidaa42c72013-03-23 16:30:16 +02003879 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3880 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003881
3882if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003883 unittest.main()