blob: baa6965c4a2fa28bdc38aeee74014d4785622009 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +030032import warnings
Antoine Pitrou19690592009-06-12 20:14:08 +000033import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000034import signal
35import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000036from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000037from collections import deque
Antoine Pitrou78e761e2012-10-16 22:57:11 +020038from UserList import UserList
Antoine Pitrou19690592009-06-12 20:14:08 +000039from test import test_support as support
Serhiy Storchaka354d50e2013-02-03 17:10:42 +020040import contextlib
Christian Heimes1a6387e2008-03-26 12:49:49 +000041
42import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000043import io # C implementation of io
44import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000045try:
46 import threading
47except ImportError:
48 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010049try:
50 import fcntl
51except ImportError:
52 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000053
54__metaclass__ = type
55bytes = support.py3k_bytes
56
57def _default_chunk_size():
58 """Get the default TextIOWrapper chunk size"""
59 with io.open(__file__, "r", encoding="latin1") as f:
60 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000061
62
Antoine Pitrou6391b342010-09-14 18:48:19 +000063class MockRawIOWithoutRead:
64 """A RawIO implementation without read(), so as to exercise the default
65 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000066
67 def __init__(self, read_stack=()):
68 self._read_stack = list(read_stack)
69 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000070 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000071 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000072
Christian Heimes1a6387e2008-03-26 12:49:49 +000073 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000074 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000075 return len(b)
76
77 def writable(self):
78 return True
79
80 def fileno(self):
81 return 42
82
83 def readable(self):
84 return True
85
86 def seekable(self):
87 return True
88
89 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000090 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000091
92 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000093 return 0 # same comment as above
94
95 def readinto(self, buf):
96 self._reads += 1
97 max_len = len(buf)
98 try:
99 data = self._read_stack[0]
100 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000101 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +0000102 return 0
103 if data is None:
104 del self._read_stack[0]
105 return None
106 n = len(data)
107 if len(data) <= max_len:
108 del self._read_stack[0]
109 buf[:n] = data
110 return n
111 else:
112 buf[:] = data[:max_len]
113 self._read_stack[0] = data[max_len:]
114 return max_len
115
116 def truncate(self, pos=None):
117 return pos
118
Antoine Pitrou6391b342010-09-14 18:48:19 +0000119class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
120 pass
121
122class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
123 pass
124
125
126class MockRawIO(MockRawIOWithoutRead):
127
128 def read(self, n=None):
129 self._reads += 1
130 try:
131 return self._read_stack.pop(0)
132 except:
133 self._extraneous_reads += 1
134 return b""
135
Antoine Pitrou19690592009-06-12 20:14:08 +0000136class CMockRawIO(MockRawIO, io.RawIOBase):
137 pass
138
139class PyMockRawIO(MockRawIO, pyio.RawIOBase):
140 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000141
142
Antoine Pitrou19690592009-06-12 20:14:08 +0000143class MisbehavedRawIO(MockRawIO):
144 def write(self, b):
145 return MockRawIO.write(self, b) * 2
146
147 def read(self, n=None):
148 return MockRawIO.read(self, n) * 2
149
150 def seek(self, pos, whence):
151 return -123
152
153 def tell(self):
154 return -456
155
156 def readinto(self, buf):
157 MockRawIO.readinto(self, buf)
158 return len(buf) * 5
159
160class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
161 pass
162
163class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
164 pass
165
166
167class CloseFailureIO(MockRawIO):
168 closed = 0
169
170 def close(self):
171 if not self.closed:
172 self.closed = 1
173 raise IOError
174
175class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
176 pass
177
178class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
179 pass
180
181
182class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000183
184 def __init__(self, data):
185 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000187
188 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000189 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190 self.read_history.append(None if res is None else len(res))
191 return res
192
Antoine Pitrou19690592009-06-12 20:14:08 +0000193 def readinto(self, b):
194 res = super(MockFileIO, self).readinto(b)
195 self.read_history.append(res)
196 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000197
Antoine Pitrou19690592009-06-12 20:14:08 +0000198class CMockFileIO(MockFileIO, io.BytesIO):
199 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000200
Antoine Pitrou19690592009-06-12 20:14:08 +0000201class PyMockFileIO(MockFileIO, pyio.BytesIO):
202 pass
203
204
205class MockNonBlockWriterIO:
206
207 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000208 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000209 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000210
Antoine Pitrou19690592009-06-12 20:14:08 +0000211 def pop_written(self):
212 s = b"".join(self._write_stack)
213 self._write_stack[:] = []
214 return s
215
216 def block_on(self, char):
217 """Block when a given char is encountered."""
218 self._blocker_char = char
219
220 def readable(self):
221 return True
222
223 def seekable(self):
224 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000225
226 def writable(self):
227 return True
228
Antoine Pitrou19690592009-06-12 20:14:08 +0000229 def write(self, b):
230 b = bytes(b)
231 n = -1
232 if self._blocker_char:
233 try:
234 n = b.index(self._blocker_char)
235 except ValueError:
236 pass
237 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100238 if n > 0:
239 # write data up to the first blocker
240 self._write_stack.append(b[:n])
241 return n
242 else:
243 # cancel blocker and indicate would block
244 self._blocker_char = None
245 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000246 self._write_stack.append(b)
247 return len(b)
248
249class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
250 BlockingIOError = io.BlockingIOError
251
252class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
253 BlockingIOError = pyio.BlockingIOError
254
Christian Heimes1a6387e2008-03-26 12:49:49 +0000255
256class IOTest(unittest.TestCase):
257
Antoine Pitrou19690592009-06-12 20:14:08 +0000258 def setUp(self):
259 support.unlink(support.TESTFN)
260
Christian Heimes1a6387e2008-03-26 12:49:49 +0000261 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000262 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000263
264 def write_ops(self, f):
265 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000266 f.truncate(0)
267 self.assertEqual(f.tell(), 5)
268 f.seek(0)
269
270 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000271 self.assertEqual(f.seek(0), 0)
272 self.assertEqual(f.write(b"Hello."), 6)
273 self.assertEqual(f.tell(), 6)
274 self.assertEqual(f.seek(-1, 1), 5)
275 self.assertEqual(f.tell(), 5)
276 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
277 self.assertEqual(f.seek(0), 0)
278 self.assertEqual(f.write(b"h"), 1)
279 self.assertEqual(f.seek(-1, 2), 13)
280 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000281
Christian Heimes1a6387e2008-03-26 12:49:49 +0000282 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000283 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000284 self.assertRaises(TypeError, f.seek, 0.0)
285
286 def read_ops(self, f, buffered=False):
287 data = f.read(5)
288 self.assertEqual(data, b"hello")
289 data = bytearray(data)
290 self.assertEqual(f.readinto(data), 5)
291 self.assertEqual(data, b" worl")
292 self.assertEqual(f.readinto(data), 2)
293 self.assertEqual(len(data), 5)
294 self.assertEqual(data[:2], b"d\n")
295 self.assertEqual(f.seek(0), 0)
296 self.assertEqual(f.read(20), b"hello world\n")
297 self.assertEqual(f.read(1), b"")
298 self.assertEqual(f.readinto(bytearray(b"x")), 0)
299 self.assertEqual(f.seek(-6, 2), 6)
300 self.assertEqual(f.read(5), b"world")
301 self.assertEqual(f.read(0), b"")
302 self.assertEqual(f.readinto(bytearray()), 0)
303 self.assertEqual(f.seek(-6, 1), 5)
304 self.assertEqual(f.read(5), b" worl")
305 self.assertEqual(f.tell(), 10)
306 self.assertRaises(TypeError, f.seek, 0.0)
307 if buffered:
308 f.seek(0)
309 self.assertEqual(f.read(), b"hello world\n")
310 f.seek(6)
311 self.assertEqual(f.read(), b"world\n")
312 self.assertEqual(f.read(), b"")
313
314 LARGE = 2**31
315
316 def large_file_ops(self, f):
317 assert f.readable()
318 assert f.writable()
319 self.assertEqual(f.seek(self.LARGE), self.LARGE)
320 self.assertEqual(f.tell(), self.LARGE)
321 self.assertEqual(f.write(b"xxx"), 3)
322 self.assertEqual(f.tell(), self.LARGE + 3)
323 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
324 self.assertEqual(f.truncate(), self.LARGE + 2)
325 self.assertEqual(f.tell(), self.LARGE + 2)
326 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
327 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000328 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000329 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
330 self.assertEqual(f.seek(-1, 2), self.LARGE)
331 self.assertEqual(f.read(2), b"x")
332
Antoine Pitrou19690592009-06-12 20:14:08 +0000333 def test_invalid_operations(self):
334 # Try writing on a file opened in read mode and vice-versa.
335 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000336 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000337 self.assertRaises(IOError, fp.read)
338 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000339 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000340 self.assertRaises(IOError, fp.write, b"blah")
341 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000342 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000343 self.assertRaises(IOError, fp.write, "blah")
344 self.assertRaises(IOError, fp.writelines, ["blah\n"])
345
Christian Heimes1a6387e2008-03-26 12:49:49 +0000346 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000347 with self.open(support.TESTFN, "wb", buffering=0) as f:
348 self.assertEqual(f.readable(), False)
349 self.assertEqual(f.writable(), True)
350 self.assertEqual(f.seekable(), True)
351 self.write_ops(f)
352 with self.open(support.TESTFN, "rb", buffering=0) as f:
353 self.assertEqual(f.readable(), True)
354 self.assertEqual(f.writable(), False)
355 self.assertEqual(f.seekable(), True)
356 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000357
358 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000359 with self.open(support.TESTFN, "wb") as f:
360 self.assertEqual(f.readable(), False)
361 self.assertEqual(f.writable(), True)
362 self.assertEqual(f.seekable(), True)
363 self.write_ops(f)
364 with self.open(support.TESTFN, "rb") as f:
365 self.assertEqual(f.readable(), True)
366 self.assertEqual(f.writable(), False)
367 self.assertEqual(f.seekable(), True)
368 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000369
370 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000371 with self.open(support.TESTFN, "wb") as f:
372 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
373 with self.open(support.TESTFN, "rb") as f:
374 self.assertEqual(f.readline(), b"abc\n")
375 self.assertEqual(f.readline(10), b"def\n")
376 self.assertEqual(f.readline(2), b"xy")
377 self.assertEqual(f.readline(4), b"zzy\n")
378 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000379 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000380 self.assertRaises(TypeError, f.readline, 5.3)
381 with self.open(support.TESTFN, "r") as f:
382 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000383
384 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000385 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000386 self.write_ops(f)
387 data = f.getvalue()
388 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000389 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000390 self.read_ops(f, True)
391
392 def test_large_file_ops(self):
393 # On Windows and Mac OSX this test comsumes large resources; It takes
394 # a long time to build the >2GB file and takes >2GB of disk space
395 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000396 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware1f702212013-12-10 14:09:20 -0600397 support.requires(
398 'largefile',
399 'test requires %s bytes and a long time to run' % self.LARGE)
Antoine Pitrou19690592009-06-12 20:14:08 +0000400 with self.open(support.TESTFN, "w+b", 0) as f:
401 self.large_file_ops(f)
402 with self.open(support.TESTFN, "w+b") as f:
403 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000404
405 def test_with_open(self):
406 for bufsize in (0, 1, 100):
407 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000408 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000409 f.write(b"xxx")
410 self.assertEqual(f.closed, True)
411 f = None
412 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000413 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000414 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000415 except ZeroDivisionError:
416 self.assertEqual(f.closed, True)
417 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000418 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000419
Antoine Pitroue741cc62009-01-21 00:45:36 +0000420 # issue 5008
421 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000422 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000423 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000424 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000425 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000427 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000428 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000429 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000430
Christian Heimes1a6387e2008-03-26 12:49:49 +0000431 def test_destructor(self):
432 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000433 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000434 def __del__(self):
435 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000436 try:
437 f = super(MyFileIO, self).__del__
438 except AttributeError:
439 pass
440 else:
441 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000442 def close(self):
443 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000444 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000445 def flush(self):
446 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000447 super(MyFileIO, self).flush()
448 f = MyFileIO(support.TESTFN, "wb")
449 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000450 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000451 support.gc_collect()
452 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000453 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000454 self.assertEqual(f.read(), b"xxx")
455
456 def _check_base_destructor(self, base):
457 record = []
458 class MyIO(base):
459 def __init__(self):
460 # This exercises the availability of attributes on object
461 # destruction.
462 # (in the C version, close() is called by the tp_dealloc
463 # function, not by __del__)
464 self.on_del = 1
465 self.on_close = 2
466 self.on_flush = 3
467 def __del__(self):
468 record.append(self.on_del)
469 try:
470 f = super(MyIO, self).__del__
471 except AttributeError:
472 pass
473 else:
474 f()
475 def close(self):
476 record.append(self.on_close)
477 super(MyIO, self).close()
478 def flush(self):
479 record.append(self.on_flush)
480 super(MyIO, self).flush()
481 f = MyIO()
482 del f
483 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000484 self.assertEqual(record, [1, 2, 3])
485
Antoine Pitrou19690592009-06-12 20:14:08 +0000486 def test_IOBase_destructor(self):
487 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000488
Antoine Pitrou19690592009-06-12 20:14:08 +0000489 def test_RawIOBase_destructor(self):
490 self._check_base_destructor(self.RawIOBase)
491
492 def test_BufferedIOBase_destructor(self):
493 self._check_base_destructor(self.BufferedIOBase)
494
495 def test_TextIOBase_destructor(self):
496 self._check_base_destructor(self.TextIOBase)
497
498 def test_close_flushes(self):
499 with self.open(support.TESTFN, "wb") as f:
500 f.write(b"xxx")
501 with self.open(support.TESTFN, "rb") as f:
502 self.assertEqual(f.read(), b"xxx")
503
504 def test_array_writes(self):
505 a = array.array(b'i', range(10))
506 n = len(a.tostring())
507 with self.open(support.TESTFN, "wb", 0) as f:
508 self.assertEqual(f.write(a), n)
509 with self.open(support.TESTFN, "wb") as f:
510 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000511
512 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000513 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000514 closefd=False)
515
Antoine Pitrou19690592009-06-12 20:14:08 +0000516 def test_read_closed(self):
517 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000518 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000519 with self.open(support.TESTFN, "r") as f:
520 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000521 self.assertEqual(file.read(), "egg\n")
522 file.seek(0)
523 file.close()
524 self.assertRaises(ValueError, file.read)
525
526 def test_no_closefd_with_filename(self):
527 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000528 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000529
530 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000532 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000533 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000534 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000535 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000536 self.assertEqual(file.buffer.raw.closefd, False)
537
Antoine Pitrou19690592009-06-12 20:14:08 +0000538 def test_garbage_collection(self):
539 # FileIO objects are collected, and collecting them flushes
540 # all data to disk.
541 f = self.FileIO(support.TESTFN, "wb")
542 f.write(b"abcxxx")
543 f.f = f
544 wr = weakref.ref(f)
545 del f
546 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000547 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000548 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000549 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000550
Antoine Pitrou19690592009-06-12 20:14:08 +0000551 def test_unbounded_file(self):
552 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
553 zero = "/dev/zero"
554 if not os.path.exists(zero):
555 self.skipTest("{0} does not exist".format(zero))
556 if sys.maxsize > 0x7FFFFFFF:
557 self.skipTest("test can only run in a 32-bit address space")
558 if support.real_max_memuse < support._2G:
559 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000560 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000561 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000562 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000563 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000564 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000565 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000566
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200567 def check_flush_error_on_close(self, *args, **kwargs):
568 # Test that the file is closed despite failed flush
569 # and that flush() is called before file closed.
570 f = self.open(*args, **kwargs)
571 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000572 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200573 closed[:] = [f.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000574 raise IOError()
575 f.flush = bad_flush
576 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600577 self.assertTrue(f.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200578 self.assertTrue(closed) # flush() called
579 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchaka437d5352015-02-23 00:28:38 +0200580 f.flush = lambda: None # break reference loop
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200581
582 def test_flush_error_on_close(self):
583 # raw file
584 # Issue #5700: io.FileIO calls flush() after file closed
585 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
586 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
587 self.check_flush_error_on_close(fd, 'wb', buffering=0)
588 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
589 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
590 os.close(fd)
591 # buffered io
592 self.check_flush_error_on_close(support.TESTFN, 'wb')
593 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
594 self.check_flush_error_on_close(fd, 'wb')
595 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
596 self.check_flush_error_on_close(fd, 'wb', closefd=False)
597 os.close(fd)
598 # text io
599 self.check_flush_error_on_close(support.TESTFN, 'w')
600 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
601 self.check_flush_error_on_close(fd, 'w')
602 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
603 self.check_flush_error_on_close(fd, 'w', closefd=False)
604 os.close(fd)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000605
606 def test_multi_close(self):
607 f = self.open(support.TESTFN, "wb", buffering=0)
608 f.close()
609 f.close()
610 f.close()
611 self.assertRaises(ValueError, f.flush)
612
Antoine Pitrou6391b342010-09-14 18:48:19 +0000613 def test_RawIOBase_read(self):
614 # Exercise the default RawIOBase.read() implementation (which calls
615 # readinto() internally).
616 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
617 self.assertEqual(rawio.read(2), b"ab")
618 self.assertEqual(rawio.read(2), b"c")
619 self.assertEqual(rawio.read(2), b"d")
620 self.assertEqual(rawio.read(2), None)
621 self.assertEqual(rawio.read(2), b"ef")
622 self.assertEqual(rawio.read(2), b"g")
623 self.assertEqual(rawio.read(2), None)
624 self.assertEqual(rawio.read(2), b"")
625
Hynek Schlawack877effc2012-05-25 09:24:18 +0200626 def test_fileio_closefd(self):
627 # Issue #4841
628 with self.open(__file__, 'rb') as f1, \
629 self.open(__file__, 'rb') as f2:
630 fileio = self.FileIO(f1.fileno(), closefd=False)
631 # .__init__() must not close f1
632 fileio.__init__(f2.fileno(), closefd=False)
633 f1.readline()
634 # .close() must not close f2
635 fileio.close()
636 f2.readline()
637
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +0300638 def test_nonbuffered_textio(self):
639 with warnings.catch_warnings(record=True) as recorded:
640 with self.assertRaises(ValueError):
641 self.open(support.TESTFN, 'w', buffering=0)
642 support.gc_collect()
643 self.assertEqual(recorded, [])
644
645 def test_invalid_newline(self):
646 with warnings.catch_warnings(record=True) as recorded:
647 with self.assertRaises(ValueError):
648 self.open(support.TESTFN, 'w', newline='invalid')
649 support.gc_collect()
650 self.assertEqual(recorded, [])
651
Hynek Schlawack877effc2012-05-25 09:24:18 +0200652
Antoine Pitrou19690592009-06-12 20:14:08 +0000653class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200654
655 def test_IOBase_finalize(self):
656 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
657 # class which inherits IOBase and an object of this class are caught
658 # in a reference cycle and close() is already in the method cache.
659 class MyIO(self.IOBase):
660 def close(self):
661 pass
662
663 # create an instance to populate the method cache
664 MyIO()
665 obj = MyIO()
666 obj.obj = obj
667 wr = weakref.ref(obj)
668 del MyIO
669 del obj
670 support.gc_collect()
671 self.assertTrue(wr() is None, wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000672
Antoine Pitrou19690592009-06-12 20:14:08 +0000673class PyIOTest(IOTest):
674 test_array_writes = unittest.skip(
675 "len(array.array) returns number of elements rather than bytelength"
676 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000677
678
Antoine Pitrou19690592009-06-12 20:14:08 +0000679class CommonBufferedTests:
680 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
681
682 def test_detach(self):
683 raw = self.MockRawIO()
684 buf = self.tp(raw)
685 self.assertIs(buf.detach(), raw)
686 self.assertRaises(ValueError, buf.detach)
687
Benjamin Peterson53ae6142014-12-21 20:51:50 -0600688 repr(buf) # Should still work
689
Antoine Pitrou19690592009-06-12 20:14:08 +0000690 def test_fileno(self):
691 rawio = self.MockRawIO()
692 bufio = self.tp(rawio)
693
Ezio Melotti2623a372010-11-21 13:34:58 +0000694 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000695
Zachary Ware1f702212013-12-10 14:09:20 -0600696 @unittest.skip('test having existential crisis')
Antoine Pitrou19690592009-06-12 20:14:08 +0000697 def test_no_fileno(self):
698 # XXX will we always have fileno() function? If so, kill
699 # this test. Else, write it.
700 pass
701
702 def test_invalid_args(self):
703 rawio = self.MockRawIO()
704 bufio = self.tp(rawio)
705 # Invalid whence
706 self.assertRaises(ValueError, bufio.seek, 0, -1)
707 self.assertRaises(ValueError, bufio.seek, 0, 3)
708
709 def test_override_destructor(self):
710 tp = self.tp
711 record = []
712 class MyBufferedIO(tp):
713 def __del__(self):
714 record.append(1)
715 try:
716 f = super(MyBufferedIO, self).__del__
717 except AttributeError:
718 pass
719 else:
720 f()
721 def close(self):
722 record.append(2)
723 super(MyBufferedIO, self).close()
724 def flush(self):
725 record.append(3)
726 super(MyBufferedIO, self).flush()
727 rawio = self.MockRawIO()
728 bufio = MyBufferedIO(rawio)
729 writable = bufio.writable()
730 del bufio
731 support.gc_collect()
732 if writable:
733 self.assertEqual(record, [1, 2, 3])
734 else:
735 self.assertEqual(record, [1, 2])
736
737 def test_context_manager(self):
738 # Test usability as a context manager
739 rawio = self.MockRawIO()
740 bufio = self.tp(rawio)
741 def _with():
742 with bufio:
743 pass
744 _with()
745 # bufio should now be closed, and using it a second time should raise
746 # a ValueError.
747 self.assertRaises(ValueError, _with)
748
749 def test_error_through_destructor(self):
750 # Test that the exception state is not modified by a destructor,
751 # even if close() fails.
752 rawio = self.CloseFailureIO()
753 def f():
754 self.tp(rawio).xyzzy
755 with support.captured_output("stderr") as s:
756 self.assertRaises(AttributeError, f)
757 s = s.getvalue().strip()
758 if s:
759 # The destructor *may* have printed an unraisable error, check it
760 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000761 self.assertTrue(s.startswith("Exception IOError: "), s)
762 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000763
764 def test_repr(self):
765 raw = self.MockRawIO()
766 b = self.tp(raw)
767 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
768 self.assertEqual(repr(b), "<%s>" % clsname)
769 raw.name = "dummy"
770 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
771 raw.name = b"dummy"
772 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000773
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000774 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200775 # Test that buffered file is closed despite failed flush
776 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000777 raw = self.MockRawIO()
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200778 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000779 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200780 closed[:] = [b.closed, raw.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000781 raise IOError()
782 raw.flush = bad_flush
783 b = self.tp(raw)
784 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600785 self.assertTrue(b.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +0200786 self.assertTrue(raw.closed)
787 self.assertTrue(closed) # flush() called
788 self.assertFalse(closed[0]) # flush() called before file closed
789 self.assertFalse(closed[1])
Serhiy Storchaka437d5352015-02-23 00:28:38 +0200790 raw.flush = lambda: None # break reference loop
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600791
792 def test_close_error_on_close(self):
793 raw = self.MockRawIO()
794 def bad_flush():
795 raise IOError('flush')
796 def bad_close():
797 raise IOError('close')
798 raw.close = bad_close
799 b = self.tp(raw)
800 b.flush = bad_flush
801 with self.assertRaises(IOError) as err: # exception not swallowed
802 b.close()
803 self.assertEqual(err.exception.args, ('close',))
804 self.assertFalse(b.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000805
806 def test_multi_close(self):
807 raw = self.MockRawIO()
808 b = self.tp(raw)
809 b.close()
810 b.close()
811 b.close()
812 self.assertRaises(ValueError, b.flush)
813
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000814 def test_readonly_attributes(self):
815 raw = self.MockRawIO()
816 buf = self.tp(raw)
817 x = self.MockRawIO()
818 with self.assertRaises((AttributeError, TypeError)):
819 buf.raw = x
820
Christian Heimes1a6387e2008-03-26 12:49:49 +0000821
Antoine Pitroubff5df02012-07-29 19:02:46 +0200822class SizeofTest:
823
824 @support.cpython_only
825 def test_sizeof(self):
826 bufsize1 = 4096
827 bufsize2 = 8192
828 rawio = self.MockRawIO()
829 bufio = self.tp(rawio, buffer_size=bufsize1)
830 size = sys.getsizeof(bufio) - bufsize1
831 rawio = self.MockRawIO()
832 bufio = self.tp(rawio, buffer_size=bufsize2)
833 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
834
835
Antoine Pitrou19690592009-06-12 20:14:08 +0000836class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
837 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000838
Antoine Pitrou19690592009-06-12 20:14:08 +0000839 def test_constructor(self):
840 rawio = self.MockRawIO([b"abc"])
841 bufio = self.tp(rawio)
842 bufio.__init__(rawio)
843 bufio.__init__(rawio, buffer_size=1024)
844 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000845 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000846 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
847 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
848 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
849 rawio = self.MockRawIO([b"abc"])
850 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000851 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000852
Serhiy Storchaka1d19f972014-02-12 10:52:07 +0200853 def test_uninitialized(self):
854 bufio = self.tp.__new__(self.tp)
855 del bufio
856 bufio = self.tp.__new__(self.tp)
857 self.assertRaisesRegexp((ValueError, AttributeError),
858 'uninitialized|has no attribute',
859 bufio.read, 0)
860 bufio.__init__(self.MockRawIO())
861 self.assertEqual(bufio.read(0), b'')
862
Antoine Pitrou19690592009-06-12 20:14:08 +0000863 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000864 for arg in (None, 7):
865 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
866 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000867 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000868 # Invalid args
869 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000870
Antoine Pitrou19690592009-06-12 20:14:08 +0000871 def test_read1(self):
872 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
873 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000874 self.assertEqual(b"a", bufio.read(1))
875 self.assertEqual(b"b", bufio.read1(1))
876 self.assertEqual(rawio._reads, 1)
877 self.assertEqual(b"c", bufio.read1(100))
878 self.assertEqual(rawio._reads, 1)
879 self.assertEqual(b"d", bufio.read1(100))
880 self.assertEqual(rawio._reads, 2)
881 self.assertEqual(b"efg", bufio.read1(100))
882 self.assertEqual(rawio._reads, 3)
883 self.assertEqual(b"", bufio.read1(100))
884 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000885 # Invalid args
886 self.assertRaises(ValueError, bufio.read1, -1)
887
888 def test_readinto(self):
889 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
890 bufio = self.tp(rawio)
891 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000892 self.assertEqual(bufio.readinto(b), 2)
893 self.assertEqual(b, b"ab")
894 self.assertEqual(bufio.readinto(b), 2)
895 self.assertEqual(b, b"cd")
896 self.assertEqual(bufio.readinto(b), 2)
897 self.assertEqual(b, b"ef")
898 self.assertEqual(bufio.readinto(b), 1)
899 self.assertEqual(b, b"gf")
900 self.assertEqual(bufio.readinto(b), 0)
901 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000902
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000903 def test_readlines(self):
904 def bufio():
905 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
906 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000907 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
908 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
909 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000910
Antoine Pitrou19690592009-06-12 20:14:08 +0000911 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000912 data = b"abcdefghi"
913 dlen = len(data)
914
915 tests = [
916 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
917 [ 100, [ 3, 3, 3], [ dlen ] ],
918 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
919 ]
920
921 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000922 rawio = self.MockFileIO(data)
923 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000924 pos = 0
925 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000926 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000927 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000928 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000929 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000930
Antoine Pitrou19690592009-06-12 20:14:08 +0000931 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000932 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000933 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
934 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000935 self.assertEqual(b"abcd", bufio.read(6))
936 self.assertEqual(b"e", bufio.read(1))
937 self.assertEqual(b"fg", bufio.read())
938 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200939 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000940 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000941
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200942 rawio = self.MockRawIO((b"a", None, None))
943 self.assertEqual(b"a", rawio.readall())
944 self.assertIsNone(rawio.readall())
945
Antoine Pitrou19690592009-06-12 20:14:08 +0000946 def test_read_past_eof(self):
947 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
948 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000949
Ezio Melotti2623a372010-11-21 13:34:58 +0000950 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000951
Antoine Pitrou19690592009-06-12 20:14:08 +0000952 def test_read_all(self):
953 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
954 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000955
Ezio Melotti2623a372010-11-21 13:34:58 +0000956 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000957
Victor Stinner6a102812010-04-27 23:55:59 +0000958 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000959 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000960 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000961 try:
962 # Write out many bytes with exactly the same number of 0's,
963 # 1's... 255's. This will help us check that concurrent reading
964 # doesn't duplicate or forget contents.
965 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000966 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000967 random.shuffle(l)
968 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000969 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000970 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000971 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000972 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000973 errors = []
974 results = []
975 def f():
976 try:
977 # Intra-buffer read then buffer-flushing read
978 for n in cycle([1, 19]):
979 s = bufio.read(n)
980 if not s:
981 break
982 # list.append() is atomic
983 results.append(s)
984 except Exception as e:
985 errors.append(e)
986 raise
987 threads = [threading.Thread(target=f) for x in range(20)]
988 for t in threads:
989 t.start()
990 time.sleep(0.02) # yield
991 for t in threads:
992 t.join()
993 self.assertFalse(errors,
994 "the following exceptions were caught: %r" % errors)
995 s = b''.join(results)
996 for i in range(256):
997 c = bytes(bytearray([i]))
998 self.assertEqual(s.count(c), N)
999 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001000 support.unlink(support.TESTFN)
1001
1002 def test_misbehaved_io(self):
1003 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1004 bufio = self.tp(rawio)
1005 self.assertRaises(IOError, bufio.seek, 0)
1006 self.assertRaises(IOError, bufio.tell)
1007
Antoine Pitroucb4f47c2010-08-11 13:40:17 +00001008 def test_no_extraneous_read(self):
1009 # Issue #9550; when the raw IO object has satisfied the read request,
1010 # we should not issue any additional reads, otherwise it may block
1011 # (e.g. socket).
1012 bufsize = 16
1013 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1014 rawio = self.MockRawIO([b"x" * n])
1015 bufio = self.tp(rawio, bufsize)
1016 self.assertEqual(bufio.read(n), b"x" * n)
1017 # Simple case: one raw read is enough to satisfy the request.
1018 self.assertEqual(rawio._extraneous_reads, 0,
1019 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1020 # A more complex case where two raw reads are needed to satisfy
1021 # the request.
1022 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1023 bufio = self.tp(rawio, bufsize)
1024 self.assertEqual(bufio.read(n), b"x" * n)
1025 self.assertEqual(rawio._extraneous_reads, 0,
1026 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1027
1028
Antoine Pitroubff5df02012-07-29 19:02:46 +02001029class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001030 tp = io.BufferedReader
1031
1032 def test_constructor(self):
1033 BufferedReaderTest.test_constructor(self)
1034 # The allocation can succeed on 32-bit builds, e.g. with more
1035 # than 2GB RAM and a 64-bit kernel.
1036 if sys.maxsize > 0x7FFFFFFF:
1037 rawio = self.MockRawIO()
1038 bufio = self.tp(rawio)
1039 self.assertRaises((OverflowError, MemoryError, ValueError),
1040 bufio.__init__, rawio, sys.maxsize)
1041
1042 def test_initialization(self):
1043 rawio = self.MockRawIO([b"abc"])
1044 bufio = self.tp(rawio)
1045 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1046 self.assertRaises(ValueError, bufio.read)
1047 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1048 self.assertRaises(ValueError, bufio.read)
1049 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1050 self.assertRaises(ValueError, bufio.read)
1051
1052 def test_misbehaved_io_read(self):
1053 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1054 bufio = self.tp(rawio)
1055 # _pyio.BufferedReader seems to implement reading different, so that
1056 # checking this is not so easy.
1057 self.assertRaises(IOError, bufio.read, 10)
1058
1059 def test_garbage_collection(self):
1060 # C BufferedReader objects are collected.
1061 # The Python version has __del__, so it ends into gc.garbage instead
1062 rawio = self.FileIO(support.TESTFN, "w+b")
1063 f = self.tp(rawio)
1064 f.f = f
1065 wr = weakref.ref(f)
1066 del f
1067 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001068 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00001069
R David Murray5b2cf5e2013-02-23 22:11:21 -05001070 def test_args_error(self):
1071 # Issue #17275
1072 with self.assertRaisesRegexp(TypeError, "BufferedReader"):
1073 self.tp(io.BytesIO(), 1024, 1024, 1024)
1074
1075
Antoine Pitrou19690592009-06-12 20:14:08 +00001076class PyBufferedReaderTest(BufferedReaderTest):
1077 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001078
1079
Antoine Pitrou19690592009-06-12 20:14:08 +00001080class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1081 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001082
Antoine Pitrou19690592009-06-12 20:14:08 +00001083 def test_constructor(self):
1084 rawio = self.MockRawIO()
1085 bufio = self.tp(rawio)
1086 bufio.__init__(rawio)
1087 bufio.__init__(rawio, buffer_size=1024)
1088 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +00001089 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001090 bufio.flush()
1091 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1092 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1093 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1094 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +00001095 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001096 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001097 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001098
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001099 def test_uninitialized(self):
1100 bufio = self.tp.__new__(self.tp)
1101 del bufio
1102 bufio = self.tp.__new__(self.tp)
1103 self.assertRaisesRegexp((ValueError, AttributeError),
1104 'uninitialized|has no attribute',
1105 bufio.write, b'')
1106 bufio.__init__(self.MockRawIO())
1107 self.assertEqual(bufio.write(b''), 0)
1108
Antoine Pitrou19690592009-06-12 20:14:08 +00001109 def test_detach_flush(self):
1110 raw = self.MockRawIO()
1111 buf = self.tp(raw)
1112 buf.write(b"howdy!")
1113 self.assertFalse(raw._write_stack)
1114 buf.detach()
1115 self.assertEqual(raw._write_stack, [b"howdy!"])
1116
1117 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001118 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +00001119 writer = self.MockRawIO()
1120 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001121 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001122 self.assertFalse(writer._write_stack)
1123
Antoine Pitrou19690592009-06-12 20:14:08 +00001124 def test_write_overflow(self):
1125 writer = self.MockRawIO()
1126 bufio = self.tp(writer, 8)
1127 contents = b"abcdefghijklmnop"
1128 for n in range(0, len(contents), 3):
1129 bufio.write(contents[n:n+3])
1130 flushed = b"".join(writer._write_stack)
1131 # At least (total - 8) bytes were implicitly flushed, perhaps more
1132 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001133 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001134
Antoine Pitrou19690592009-06-12 20:14:08 +00001135 def check_writes(self, intermediate_func):
1136 # Lots of writes, test the flushed output is as expected.
1137 contents = bytes(range(256)) * 1000
1138 n = 0
1139 writer = self.MockRawIO()
1140 bufio = self.tp(writer, 13)
1141 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1142 def gen_sizes():
1143 for size in count(1):
1144 for i in range(15):
1145 yield size
1146 sizes = gen_sizes()
1147 while n < len(contents):
1148 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001149 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001150 intermediate_func(bufio)
1151 n += size
1152 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001153 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001154 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001155
Antoine Pitrou19690592009-06-12 20:14:08 +00001156 def test_writes(self):
1157 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001158
Antoine Pitrou19690592009-06-12 20:14:08 +00001159 def test_writes_and_flushes(self):
1160 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001161
Antoine Pitrou19690592009-06-12 20:14:08 +00001162 def test_writes_and_seeks(self):
1163 def _seekabs(bufio):
1164 pos = bufio.tell()
1165 bufio.seek(pos + 1, 0)
1166 bufio.seek(pos - 1, 0)
1167 bufio.seek(pos, 0)
1168 self.check_writes(_seekabs)
1169 def _seekrel(bufio):
1170 pos = bufio.seek(0, 1)
1171 bufio.seek(+1, 1)
1172 bufio.seek(-1, 1)
1173 bufio.seek(pos, 0)
1174 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001175
Antoine Pitrou19690592009-06-12 20:14:08 +00001176 def test_writes_and_truncates(self):
1177 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001178
Antoine Pitrou19690592009-06-12 20:14:08 +00001179 def test_write_non_blocking(self):
1180 raw = self.MockNonBlockWriterIO()
1181 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001182
Ezio Melotti2623a372010-11-21 13:34:58 +00001183 self.assertEqual(bufio.write(b"abcd"), 4)
1184 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001185 # 1 byte will be written, the rest will be buffered
1186 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001187 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001188
Antoine Pitrou19690592009-06-12 20:14:08 +00001189 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1190 raw.block_on(b"0")
1191 try:
1192 bufio.write(b"opqrwxyz0123456789")
1193 except self.BlockingIOError as e:
1194 written = e.characters_written
1195 else:
1196 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001197 self.assertEqual(written, 16)
1198 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001199 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001200
Ezio Melotti2623a372010-11-21 13:34:58 +00001201 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001202 s = raw.pop_written()
1203 # Previously buffered bytes were flushed
1204 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001205
Antoine Pitrou19690592009-06-12 20:14:08 +00001206 def test_write_and_rewind(self):
1207 raw = io.BytesIO()
1208 bufio = self.tp(raw, 4)
1209 self.assertEqual(bufio.write(b"abcdef"), 6)
1210 self.assertEqual(bufio.tell(), 6)
1211 bufio.seek(0, 0)
1212 self.assertEqual(bufio.write(b"XY"), 2)
1213 bufio.seek(6, 0)
1214 self.assertEqual(raw.getvalue(), b"XYcdef")
1215 self.assertEqual(bufio.write(b"123456"), 6)
1216 bufio.flush()
1217 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001218
Antoine Pitrou19690592009-06-12 20:14:08 +00001219 def test_flush(self):
1220 writer = self.MockRawIO()
1221 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001222 bufio.write(b"abc")
1223 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001224 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001225
Antoine Pitrou78e761e2012-10-16 22:57:11 +02001226 def test_writelines(self):
1227 l = [b'ab', b'cd', b'ef']
1228 writer = self.MockRawIO()
1229 bufio = self.tp(writer, 8)
1230 bufio.writelines(l)
1231 bufio.flush()
1232 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1233
1234 def test_writelines_userlist(self):
1235 l = UserList([b'ab', b'cd', b'ef'])
1236 writer = self.MockRawIO()
1237 bufio = self.tp(writer, 8)
1238 bufio.writelines(l)
1239 bufio.flush()
1240 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1241
1242 def test_writelines_error(self):
1243 writer = self.MockRawIO()
1244 bufio = self.tp(writer, 8)
1245 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1246 self.assertRaises(TypeError, bufio.writelines, None)
1247
Antoine Pitrou19690592009-06-12 20:14:08 +00001248 def test_destructor(self):
1249 writer = self.MockRawIO()
1250 bufio = self.tp(writer, 8)
1251 bufio.write(b"abc")
1252 del bufio
1253 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001254 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001255
1256 def test_truncate(self):
1257 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001258 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001259 bufio = self.tp(raw, 8)
1260 bufio.write(b"abcdef")
1261 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001262 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001263 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001264 self.assertEqual(f.read(), b"abc")
1265
Victor Stinner6a102812010-04-27 23:55:59 +00001266 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001267 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001268 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001269 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001270 # Write out many bytes from many threads and test they were
1271 # all flushed.
1272 N = 1000
1273 contents = bytes(range(256)) * N
1274 sizes = cycle([1, 19])
1275 n = 0
1276 queue = deque()
1277 while n < len(contents):
1278 size = next(sizes)
1279 queue.append(contents[n:n+size])
1280 n += size
1281 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001282 # We use a real file object because it allows us to
1283 # exercise situations where the GIL is released before
1284 # writing the buffer to the raw streams. This is in addition
1285 # to concurrency issues due to switching threads in the middle
1286 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001287 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001288 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001289 errors = []
1290 def f():
1291 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001292 while True:
1293 try:
1294 s = queue.popleft()
1295 except IndexError:
1296 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001297 bufio.write(s)
1298 except Exception as e:
1299 errors.append(e)
1300 raise
1301 threads = [threading.Thread(target=f) for x in range(20)]
1302 for t in threads:
1303 t.start()
1304 time.sleep(0.02) # yield
1305 for t in threads:
1306 t.join()
1307 self.assertFalse(errors,
1308 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001309 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001310 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001311 s = f.read()
1312 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001313 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001314 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001315 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001316
Antoine Pitrou19690592009-06-12 20:14:08 +00001317 def test_misbehaved_io(self):
1318 rawio = self.MisbehavedRawIO()
1319 bufio = self.tp(rawio, 5)
1320 self.assertRaises(IOError, bufio.seek, 0)
1321 self.assertRaises(IOError, bufio.tell)
1322 self.assertRaises(IOError, bufio.write, b"abcdef")
1323
1324 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001325 with support.check_warnings(("max_buffer_size is deprecated",
1326 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001327 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001328
Benjamin Petersona2d6d712012-12-20 12:24:10 -06001329 def test_write_error_on_close(self):
1330 raw = self.MockRawIO()
1331 def bad_write(b):
1332 raise IOError()
1333 raw.write = bad_write
1334 b = self.tp(raw)
1335 b.write(b'spam')
1336 self.assertRaises(IOError, b.close) # exception not swallowed
1337 self.assertTrue(b.closed)
1338
Antoine Pitrou19690592009-06-12 20:14:08 +00001339
Antoine Pitroubff5df02012-07-29 19:02:46 +02001340class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001341 tp = io.BufferedWriter
1342
1343 def test_constructor(self):
1344 BufferedWriterTest.test_constructor(self)
1345 # The allocation can succeed on 32-bit builds, e.g. with more
1346 # than 2GB RAM and a 64-bit kernel.
1347 if sys.maxsize > 0x7FFFFFFF:
1348 rawio = self.MockRawIO()
1349 bufio = self.tp(rawio)
1350 self.assertRaises((OverflowError, MemoryError, ValueError),
1351 bufio.__init__, rawio, sys.maxsize)
1352
1353 def test_initialization(self):
1354 rawio = self.MockRawIO()
1355 bufio = self.tp(rawio)
1356 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1357 self.assertRaises(ValueError, bufio.write, b"def")
1358 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1359 self.assertRaises(ValueError, bufio.write, b"def")
1360 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1361 self.assertRaises(ValueError, bufio.write, b"def")
1362
1363 def test_garbage_collection(self):
1364 # C BufferedWriter objects are collected, and collecting them flushes
1365 # all data to disk.
1366 # The Python version has __del__, so it ends into gc.garbage instead
1367 rawio = self.FileIO(support.TESTFN, "w+b")
1368 f = self.tp(rawio)
1369 f.write(b"123xxx")
1370 f.x = f
1371 wr = weakref.ref(f)
1372 del f
1373 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001374 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001375 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001376 self.assertEqual(f.read(), b"123xxx")
1377
R David Murray5b2cf5e2013-02-23 22:11:21 -05001378 def test_args_error(self):
1379 # Issue #17275
1380 with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
1381 self.tp(io.BytesIO(), 1024, 1024, 1024)
1382
Antoine Pitrou19690592009-06-12 20:14:08 +00001383
1384class PyBufferedWriterTest(BufferedWriterTest):
1385 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001386
1387class BufferedRWPairTest(unittest.TestCase):
1388
Antoine Pitrou19690592009-06-12 20:14:08 +00001389 def test_constructor(self):
1390 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001391 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001392
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001393 def test_uninitialized(self):
1394 pair = self.tp.__new__(self.tp)
1395 del pair
1396 pair = self.tp.__new__(self.tp)
1397 self.assertRaisesRegexp((ValueError, AttributeError),
1398 'uninitialized|has no attribute',
1399 pair.read, 0)
1400 self.assertRaisesRegexp((ValueError, AttributeError),
1401 'uninitialized|has no attribute',
1402 pair.write, b'')
1403 pair.__init__(self.MockRawIO(), self.MockRawIO())
1404 self.assertEqual(pair.read(0), b'')
1405 self.assertEqual(pair.write(b''), 0)
1406
Antoine Pitrou19690592009-06-12 20:14:08 +00001407 def test_detach(self):
1408 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1409 self.assertRaises(self.UnsupportedOperation, pair.detach)
1410
1411 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001412 with support.check_warnings(("max_buffer_size is deprecated",
1413 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001414 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001415
1416 def test_constructor_with_not_readable(self):
1417 class NotReadable(MockRawIO):
1418 def readable(self):
1419 return False
1420
1421 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1422
1423 def test_constructor_with_not_writeable(self):
1424 class NotWriteable(MockRawIO):
1425 def writable(self):
1426 return False
1427
1428 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1429
1430 def test_read(self):
1431 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1432
1433 self.assertEqual(pair.read(3), b"abc")
1434 self.assertEqual(pair.read(1), b"d")
1435 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001436 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1437 self.assertEqual(pair.read(None), b"abc")
1438
1439 def test_readlines(self):
1440 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1441 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1442 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1443 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001444
1445 def test_read1(self):
1446 # .read1() is delegated to the underlying reader object, so this test
1447 # can be shallow.
1448 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1449
1450 self.assertEqual(pair.read1(3), b"abc")
1451
1452 def test_readinto(self):
1453 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1454
1455 data = bytearray(5)
1456 self.assertEqual(pair.readinto(data), 5)
1457 self.assertEqual(data, b"abcde")
1458
1459 def test_write(self):
1460 w = self.MockRawIO()
1461 pair = self.tp(self.MockRawIO(), w)
1462
1463 pair.write(b"abc")
1464 pair.flush()
1465 pair.write(b"def")
1466 pair.flush()
1467 self.assertEqual(w._write_stack, [b"abc", b"def"])
1468
1469 def test_peek(self):
1470 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1471
1472 self.assertTrue(pair.peek(3).startswith(b"abc"))
1473 self.assertEqual(pair.read(3), b"abc")
1474
1475 def test_readable(self):
1476 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1477 self.assertTrue(pair.readable())
1478
1479 def test_writeable(self):
1480 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1481 self.assertTrue(pair.writable())
1482
1483 def test_seekable(self):
1484 # BufferedRWPairs are never seekable, even if their readers and writers
1485 # are.
1486 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1487 self.assertFalse(pair.seekable())
1488
1489 # .flush() is delegated to the underlying writer object and has been
1490 # tested in the test_write method.
1491
1492 def test_close_and_closed(self):
1493 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1494 self.assertFalse(pair.closed)
1495 pair.close()
1496 self.assertTrue(pair.closed)
1497
1498 def test_isatty(self):
1499 class SelectableIsAtty(MockRawIO):
1500 def __init__(self, isatty):
1501 MockRawIO.__init__(self)
1502 self._isatty = isatty
1503
1504 def isatty(self):
1505 return self._isatty
1506
1507 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1508 self.assertFalse(pair.isatty())
1509
1510 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1511 self.assertTrue(pair.isatty())
1512
1513 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1514 self.assertTrue(pair.isatty())
1515
1516 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1517 self.assertTrue(pair.isatty())
1518
Benjamin Peterson1c873bf2014-09-29 22:46:57 -04001519 def test_weakref_clearing(self):
1520 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1521 ref = weakref.ref(brw)
1522 brw = None
1523 ref = None # Shouldn't segfault.
1524
Antoine Pitrou19690592009-06-12 20:14:08 +00001525class CBufferedRWPairTest(BufferedRWPairTest):
1526 tp = io.BufferedRWPair
1527
1528class PyBufferedRWPairTest(BufferedRWPairTest):
1529 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001530
1531
Antoine Pitrou19690592009-06-12 20:14:08 +00001532class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1533 read_mode = "rb+"
1534 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001535
Antoine Pitrou19690592009-06-12 20:14:08 +00001536 def test_constructor(self):
1537 BufferedReaderTest.test_constructor(self)
1538 BufferedWriterTest.test_constructor(self)
1539
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001540 def test_uninitialized(self):
1541 BufferedReaderTest.test_uninitialized(self)
1542 BufferedWriterTest.test_uninitialized(self)
1543
Antoine Pitrou19690592009-06-12 20:14:08 +00001544 def test_read_and_write(self):
1545 raw = self.MockRawIO((b"asdf", b"ghjk"))
1546 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001547
1548 self.assertEqual(b"as", rw.read(2))
1549 rw.write(b"ddd")
1550 rw.write(b"eee")
1551 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001552 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001553 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001554
Antoine Pitrou19690592009-06-12 20:14:08 +00001555 def test_seek_and_tell(self):
1556 raw = self.BytesIO(b"asdfghjkl")
1557 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001558
Ezio Melotti2623a372010-11-21 13:34:58 +00001559 self.assertEqual(b"as", rw.read(2))
1560 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001561 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001562 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001563
Antoine Pitrou808cec52011-08-20 15:40:58 +02001564 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001565 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001566 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001567 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001568 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001569 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001570 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001571 self.assertEqual(7, rw.tell())
1572 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001573 rw.flush()
1574 self.assertEqual(b"asdf123fl", raw.getvalue())
1575
Christian Heimes1a6387e2008-03-26 12:49:49 +00001576 self.assertRaises(TypeError, rw.seek, 0.0)
1577
Antoine Pitrou19690592009-06-12 20:14:08 +00001578 def check_flush_and_read(self, read_func):
1579 raw = self.BytesIO(b"abcdefghi")
1580 bufio = self.tp(raw)
1581
Ezio Melotti2623a372010-11-21 13:34:58 +00001582 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001583 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001584 self.assertEqual(b"ef", read_func(bufio, 2))
1585 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001586 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001587 self.assertEqual(6, bufio.tell())
1588 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001589 raw.seek(0, 0)
1590 raw.write(b"XYZ")
1591 # flush() resets the read buffer
1592 bufio.flush()
1593 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001594 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001595
1596 def test_flush_and_read(self):
1597 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1598
1599 def test_flush_and_readinto(self):
1600 def _readinto(bufio, n=-1):
1601 b = bytearray(n if n >= 0 else 9999)
1602 n = bufio.readinto(b)
1603 return bytes(b[:n])
1604 self.check_flush_and_read(_readinto)
1605
1606 def test_flush_and_peek(self):
1607 def _peek(bufio, n=-1):
1608 # This relies on the fact that the buffer can contain the whole
1609 # raw stream, otherwise peek() can return less.
1610 b = bufio.peek(n)
1611 if n != -1:
1612 b = b[:n]
1613 bufio.seek(len(b), 1)
1614 return b
1615 self.check_flush_and_read(_peek)
1616
1617 def test_flush_and_write(self):
1618 raw = self.BytesIO(b"abcdefghi")
1619 bufio = self.tp(raw)
1620
1621 bufio.write(b"123")
1622 bufio.flush()
1623 bufio.write(b"45")
1624 bufio.flush()
1625 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001626 self.assertEqual(b"12345fghi", raw.getvalue())
1627 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001628
1629 def test_threads(self):
1630 BufferedReaderTest.test_threads(self)
1631 BufferedWriterTest.test_threads(self)
1632
1633 def test_writes_and_peek(self):
1634 def _peek(bufio):
1635 bufio.peek(1)
1636 self.check_writes(_peek)
1637 def _peek(bufio):
1638 pos = bufio.tell()
1639 bufio.seek(-1, 1)
1640 bufio.peek(1)
1641 bufio.seek(pos, 0)
1642 self.check_writes(_peek)
1643
1644 def test_writes_and_reads(self):
1645 def _read(bufio):
1646 bufio.seek(-1, 1)
1647 bufio.read(1)
1648 self.check_writes(_read)
1649
1650 def test_writes_and_read1s(self):
1651 def _read1(bufio):
1652 bufio.seek(-1, 1)
1653 bufio.read1(1)
1654 self.check_writes(_read1)
1655
1656 def test_writes_and_readintos(self):
1657 def _read(bufio):
1658 bufio.seek(-1, 1)
1659 bufio.readinto(bytearray(1))
1660 self.check_writes(_read)
1661
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001662 def test_write_after_readahead(self):
1663 # Issue #6629: writing after the buffer was filled by readahead should
1664 # first rewind the raw stream.
1665 for overwrite_size in [1, 5]:
1666 raw = self.BytesIO(b"A" * 10)
1667 bufio = self.tp(raw, 4)
1668 # Trigger readahead
1669 self.assertEqual(bufio.read(1), b"A")
1670 self.assertEqual(bufio.tell(), 1)
1671 # Overwriting should rewind the raw stream if it needs so
1672 bufio.write(b"B" * overwrite_size)
1673 self.assertEqual(bufio.tell(), overwrite_size + 1)
1674 # If the write size was smaller than the buffer size, flush() and
1675 # check that rewind happens.
1676 bufio.flush()
1677 self.assertEqual(bufio.tell(), overwrite_size + 1)
1678 s = raw.getvalue()
1679 self.assertEqual(s,
1680 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1681
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001682 def test_write_rewind_write(self):
1683 # Various combinations of reading / writing / seeking backwards / writing again
1684 def mutate(bufio, pos1, pos2):
1685 assert pos2 >= pos1
1686 # Fill the buffer
1687 bufio.seek(pos1)
1688 bufio.read(pos2 - pos1)
1689 bufio.write(b'\x02')
1690 # This writes earlier than the previous write, but still inside
1691 # the buffer.
1692 bufio.seek(pos1)
1693 bufio.write(b'\x01')
1694
1695 b = b"\x80\x81\x82\x83\x84"
1696 for i in range(0, len(b)):
1697 for j in range(i, len(b)):
1698 raw = self.BytesIO(b)
1699 bufio = self.tp(raw, 100)
1700 mutate(bufio, i, j)
1701 bufio.flush()
1702 expected = bytearray(b)
1703 expected[j] = 2
1704 expected[i] = 1
1705 self.assertEqual(raw.getvalue(), expected,
1706 "failed result for i=%d, j=%d" % (i, j))
1707
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001708 def test_truncate_after_read_or_write(self):
1709 raw = self.BytesIO(b"A" * 10)
1710 bufio = self.tp(raw, 100)
1711 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1712 self.assertEqual(bufio.truncate(), 2)
1713 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1714 self.assertEqual(bufio.truncate(), 4)
1715
Antoine Pitrou19690592009-06-12 20:14:08 +00001716 def test_misbehaved_io(self):
1717 BufferedReaderTest.test_misbehaved_io(self)
1718 BufferedWriterTest.test_misbehaved_io(self)
1719
Antoine Pitrou808cec52011-08-20 15:40:58 +02001720 def test_interleaved_read_write(self):
1721 # Test for issue #12213
1722 with self.BytesIO(b'abcdefgh') as raw:
1723 with self.tp(raw, 100) as f:
1724 f.write(b"1")
1725 self.assertEqual(f.read(1), b'b')
1726 f.write(b'2')
1727 self.assertEqual(f.read1(1), b'd')
1728 f.write(b'3')
1729 buf = bytearray(1)
1730 f.readinto(buf)
1731 self.assertEqual(buf, b'f')
1732 f.write(b'4')
1733 self.assertEqual(f.peek(1), b'h')
1734 f.flush()
1735 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1736
1737 with self.BytesIO(b'abc') as raw:
1738 with self.tp(raw, 100) as f:
1739 self.assertEqual(f.read(1), b'a')
1740 f.write(b"2")
1741 self.assertEqual(f.read(1), b'c')
1742 f.flush()
1743 self.assertEqual(raw.getvalue(), b'a2c')
1744
1745 def test_interleaved_readline_write(self):
1746 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1747 with self.tp(raw) as f:
1748 f.write(b'1')
1749 self.assertEqual(f.readline(), b'b\n')
1750 f.write(b'2')
1751 self.assertEqual(f.readline(), b'def\n')
1752 f.write(b'3')
1753 self.assertEqual(f.readline(), b'\n')
1754 f.flush()
1755 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1756
R David Murray5b2cf5e2013-02-23 22:11:21 -05001757
Antoine Pitroubff5df02012-07-29 19:02:46 +02001758class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1759 BufferedRandomTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001760 tp = io.BufferedRandom
1761
1762 def test_constructor(self):
1763 BufferedRandomTest.test_constructor(self)
1764 # The allocation can succeed on 32-bit builds, e.g. with more
1765 # than 2GB RAM and a 64-bit kernel.
1766 if sys.maxsize > 0x7FFFFFFF:
1767 rawio = self.MockRawIO()
1768 bufio = self.tp(rawio)
1769 self.assertRaises((OverflowError, MemoryError, ValueError),
1770 bufio.__init__, rawio, sys.maxsize)
1771
1772 def test_garbage_collection(self):
1773 CBufferedReaderTest.test_garbage_collection(self)
1774 CBufferedWriterTest.test_garbage_collection(self)
1775
R David Murray5b2cf5e2013-02-23 22:11:21 -05001776 def test_args_error(self):
1777 # Issue #17275
1778 with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
1779 self.tp(io.BytesIO(), 1024, 1024, 1024)
1780
1781
Antoine Pitrou19690592009-06-12 20:14:08 +00001782class PyBufferedRandomTest(BufferedRandomTest):
1783 tp = pyio.BufferedRandom
1784
1785
Christian Heimes1a6387e2008-03-26 12:49:49 +00001786# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1787# properties:
1788# - A single output character can correspond to many bytes of input.
1789# - The number of input bytes to complete the character can be
1790# undetermined until the last input byte is received.
1791# - The number of input bytes can vary depending on previous input.
1792# - A single input byte can correspond to many characters of output.
1793# - The number of output characters can be undetermined until the
1794# last input byte is received.
1795# - The number of output characters can vary depending on previous input.
1796
1797class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1798 """
1799 For testing seek/tell behavior with a stateful, buffering decoder.
1800
1801 Input is a sequence of words. Words may be fixed-length (length set
1802 by input) or variable-length (period-terminated). In variable-length
1803 mode, extra periods are ignored. Possible words are:
1804 - 'i' followed by a number sets the input length, I (maximum 99).
1805 When I is set to 0, words are space-terminated.
1806 - 'o' followed by a number sets the output length, O (maximum 99).
1807 - Any other word is converted into a word followed by a period on
1808 the output. The output word consists of the input word truncated
1809 or padded out with hyphens to make its length equal to O. If O
1810 is 0, the word is output verbatim without truncating or padding.
1811 I and O are initially set to 1. When I changes, any buffered input is
1812 re-scanned according to the new I. EOF also terminates the last word.
1813 """
1814
1815 def __init__(self, errors='strict'):
1816 codecs.IncrementalDecoder.__init__(self, errors)
1817 self.reset()
1818
1819 def __repr__(self):
1820 return '<SID %x>' % id(self)
1821
1822 def reset(self):
1823 self.i = 1
1824 self.o = 1
1825 self.buffer = bytearray()
1826
1827 def getstate(self):
1828 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1829 return bytes(self.buffer), i*100 + o
1830
1831 def setstate(self, state):
1832 buffer, io = state
1833 self.buffer = bytearray(buffer)
1834 i, o = divmod(io, 100)
1835 self.i, self.o = i ^ 1, o ^ 1
1836
1837 def decode(self, input, final=False):
1838 output = ''
1839 for b in input:
1840 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001841 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001842 if self.buffer:
1843 output += self.process_word()
1844 else:
1845 self.buffer.append(b)
1846 else: # fixed-length, terminate after self.i bytes
1847 self.buffer.append(b)
1848 if len(self.buffer) == self.i:
1849 output += self.process_word()
1850 if final and self.buffer: # EOF terminates the last word
1851 output += self.process_word()
1852 return output
1853
1854 def process_word(self):
1855 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001856 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001857 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001858 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001859 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1860 else:
1861 output = self.buffer.decode('ascii')
1862 if len(output) < self.o:
1863 output += '-'*self.o # pad out with hyphens
1864 if self.o:
1865 output = output[:self.o] # truncate to output length
1866 output += '.'
1867 self.buffer = bytearray()
1868 return output
1869
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001870 codecEnabled = False
1871
1872 @classmethod
1873 def lookupTestDecoder(cls, name):
1874 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001875 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001876 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001877 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001878 incrementalencoder=None,
1879 streamreader=None, streamwriter=None,
1880 incrementaldecoder=cls)
1881
1882# Register the previous decoder for testing.
1883# Disabled by default, tests will enable it.
1884codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1885
1886
Christian Heimes1a6387e2008-03-26 12:49:49 +00001887class StatefulIncrementalDecoderTest(unittest.TestCase):
1888 """
1889 Make sure the StatefulIncrementalDecoder actually works.
1890 """
1891
1892 test_cases = [
1893 # I=1, O=1 (fixed-length input == fixed-length output)
1894 (b'abcd', False, 'a.b.c.d.'),
1895 # I=0, O=0 (variable-length input, variable-length output)
1896 (b'oiabcd', True, 'abcd.'),
1897 # I=0, O=0 (should ignore extra periods)
1898 (b'oi...abcd...', True, 'abcd.'),
1899 # I=0, O=6 (variable-length input, fixed-length output)
1900 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1901 # I=2, O=6 (fixed-length input < fixed-length output)
1902 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1903 # I=6, O=3 (fixed-length input > fixed-length output)
1904 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1905 # I=0, then 3; O=29, then 15 (with longer output)
1906 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1907 'a----------------------------.' +
1908 'b----------------------------.' +
1909 'cde--------------------------.' +
1910 'abcdefghijabcde.' +
1911 'a.b------------.' +
1912 '.c.------------.' +
1913 'd.e------------.' +
1914 'k--------------.' +
1915 'l--------------.' +
1916 'm--------------.')
1917 ]
1918
Antoine Pitrou19690592009-06-12 20:14:08 +00001919 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001920 # Try a few one-shot test cases.
1921 for input, eof, output in self.test_cases:
1922 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001923 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001924
1925 # Also test an unfinished decode, followed by forcing EOF.
1926 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001927 self.assertEqual(d.decode(b'oiabcd'), '')
1928 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001929
1930class TextIOWrapperTest(unittest.TestCase):
1931
1932 def setUp(self):
1933 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1934 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001935 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001936
1937 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001938 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001939
Antoine Pitrou19690592009-06-12 20:14:08 +00001940 def test_constructor(self):
1941 r = self.BytesIO(b"\xc3\xa9\n\n")
1942 b = self.BufferedReader(r, 1000)
1943 t = self.TextIOWrapper(b)
1944 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001945 self.assertEqual(t.encoding, "latin1")
1946 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001947 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001948 self.assertEqual(t.encoding, "utf8")
1949 self.assertEqual(t.line_buffering, True)
1950 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001951 self.assertRaises(TypeError, t.__init__, b, newline=42)
1952 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1953
1954 def test_detach(self):
1955 r = self.BytesIO()
1956 b = self.BufferedWriter(r)
1957 t = self.TextIOWrapper(b)
1958 self.assertIs(t.detach(), b)
1959
1960 t = self.TextIOWrapper(b, encoding="ascii")
1961 t.write("howdy")
1962 self.assertFalse(r.getvalue())
1963 t.detach()
1964 self.assertEqual(r.getvalue(), b"howdy")
1965 self.assertRaises(ValueError, t.detach)
1966
Benjamin Peterson53ae6142014-12-21 20:51:50 -06001967 # Operations independent of the detached stream should still work
1968 repr(t)
1969 self.assertEqual(t.encoding, "ascii")
1970 self.assertEqual(t.errors, "strict")
1971 self.assertFalse(t.line_buffering)
1972
Antoine Pitrou19690592009-06-12 20:14:08 +00001973 def test_repr(self):
1974 raw = self.BytesIO("hello".encode("utf-8"))
1975 b = self.BufferedReader(raw)
1976 t = self.TextIOWrapper(b, encoding="utf-8")
1977 modname = self.TextIOWrapper.__module__
1978 self.assertEqual(repr(t),
1979 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1980 raw.name = "dummy"
1981 self.assertEqual(repr(t),
1982 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1983 raw.name = b"dummy"
1984 self.assertEqual(repr(t),
1985 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1986
Benjamin Peterson53ae6142014-12-21 20:51:50 -06001987 t.buffer.detach()
1988 repr(t) # Should not raise an exception
1989
Antoine Pitrou19690592009-06-12 20:14:08 +00001990 def test_line_buffering(self):
1991 r = self.BytesIO()
1992 b = self.BufferedWriter(r, 1000)
1993 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1994 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00001995 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001996 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00001997 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001998 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00001999 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002000
Antoine Pitrou19690592009-06-12 20:14:08 +00002001 def test_encoding(self):
2002 # Check the encoding attribute is always set, and valid
2003 b = self.BytesIO()
2004 t = self.TextIOWrapper(b, encoding="utf8")
2005 self.assertEqual(t.encoding, "utf8")
2006 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002007 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002008 codecs.lookup(t.encoding)
2009
2010 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002011 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002012 b = self.BytesIO(b"abc\n\xff\n")
2013 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002014 self.assertRaises(UnicodeError, t.read)
2015 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002016 b = self.BytesIO(b"abc\n\xff\n")
2017 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002018 self.assertRaises(UnicodeError, t.read)
2019 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002020 b = self.BytesIO(b"abc\n\xff\n")
2021 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00002022 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002023 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002024 b = self.BytesIO(b"abc\n\xff\n")
2025 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00002026 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002027
Antoine Pitrou19690592009-06-12 20:14:08 +00002028 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002029 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00002030 b = self.BytesIO()
2031 t = self.TextIOWrapper(b, encoding="ascii")
2032 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002033 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00002034 b = self.BytesIO()
2035 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2036 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002037 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00002038 b = self.BytesIO()
2039 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002040 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002041 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002042 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002043 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002044 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002045 b = self.BytesIO()
2046 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002047 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002048 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002049 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002050 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002051
Antoine Pitrou19690592009-06-12 20:14:08 +00002052 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002053 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2054
2055 tests = [
2056 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2057 [ '', input_lines ],
2058 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2059 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2060 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2061 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002062 encodings = (
2063 'utf-8', 'latin-1',
2064 'utf-16', 'utf-16-le', 'utf-16-be',
2065 'utf-32', 'utf-32-le', 'utf-32-be',
2066 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00002067
2068 # Try a range of buffer sizes to test the case where \r is the last
2069 # character in TextIOWrapper._pending_line.
2070 for encoding in encodings:
2071 # XXX: str.encode() should return bytes
2072 data = bytes(''.join(input_lines).encode(encoding))
2073 for do_reads in (False, True):
2074 for bufsize in range(1, 10):
2075 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002076 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2077 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00002078 encoding=encoding)
2079 if do_reads:
2080 got_lines = []
2081 while True:
2082 c2 = textio.read(2)
2083 if c2 == '':
2084 break
Ezio Melotti2623a372010-11-21 13:34:58 +00002085 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002086 got_lines.append(c2 + textio.readline())
2087 else:
2088 got_lines = list(textio)
2089
2090 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00002091 self.assertEqual(got_line, exp_line)
2092 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002093
Antoine Pitrou19690592009-06-12 20:14:08 +00002094 def test_newlines_input(self):
2095 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002096 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2097 for newline, expected in [
2098 (None, normalized.decode("ascii").splitlines(True)),
2099 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00002100 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2101 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2102 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00002103 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00002104 buf = self.BytesIO(testdata)
2105 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00002106 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002107 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002108 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002109
Antoine Pitrou19690592009-06-12 20:14:08 +00002110 def test_newlines_output(self):
2111 testdict = {
2112 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2113 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2114 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2115 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2116 }
2117 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2118 for newline, expected in tests:
2119 buf = self.BytesIO()
2120 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2121 txt.write("AAA\nB")
2122 txt.write("BB\nCCC\n")
2123 txt.write("X\rY\r\nZ")
2124 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002125 self.assertEqual(buf.closed, False)
2126 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00002127
2128 def test_destructor(self):
2129 l = []
2130 base = self.BytesIO
2131 class MyBytesIO(base):
2132 def close(self):
2133 l.append(self.getvalue())
2134 base.close(self)
2135 b = MyBytesIO()
2136 t = self.TextIOWrapper(b, encoding="ascii")
2137 t.write("abc")
2138 del t
2139 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00002140 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00002141
2142 def test_override_destructor(self):
2143 record = []
2144 class MyTextIO(self.TextIOWrapper):
2145 def __del__(self):
2146 record.append(1)
2147 try:
2148 f = super(MyTextIO, self).__del__
2149 except AttributeError:
2150 pass
2151 else:
2152 f()
2153 def close(self):
2154 record.append(2)
2155 super(MyTextIO, self).close()
2156 def flush(self):
2157 record.append(3)
2158 super(MyTextIO, self).flush()
2159 b = self.BytesIO()
2160 t = MyTextIO(b, encoding="ascii")
2161 del t
2162 support.gc_collect()
2163 self.assertEqual(record, [1, 2, 3])
2164
2165 def test_error_through_destructor(self):
2166 # Test that the exception state is not modified by a destructor,
2167 # even if close() fails.
2168 rawio = self.CloseFailureIO()
2169 def f():
2170 self.TextIOWrapper(rawio).xyzzy
2171 with support.captured_output("stderr") as s:
2172 self.assertRaises(AttributeError, f)
2173 s = s.getvalue().strip()
2174 if s:
2175 # The destructor *may* have printed an unraisable error, check it
2176 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002177 self.assertTrue(s.startswith("Exception IOError: "), s)
2178 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002179
2180 # Systematic tests of the text I/O API
2181
Antoine Pitrou19690592009-06-12 20:14:08 +00002182 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002183 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2184 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00002185 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002186 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002187 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002188 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002189 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002190 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002191 self.assertEqual(f.tell(), 0)
2192 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002193 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002194 self.assertEqual(f.seek(0), 0)
2195 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002196 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002197 self.assertEqual(f.read(2), "ab")
2198 self.assertEqual(f.read(1), "c")
2199 self.assertEqual(f.read(1), "")
2200 self.assertEqual(f.read(), "")
2201 self.assertEqual(f.tell(), cookie)
2202 self.assertEqual(f.seek(0), 0)
2203 self.assertEqual(f.seek(0, 2), cookie)
2204 self.assertEqual(f.write("def"), 3)
2205 self.assertEqual(f.seek(cookie), cookie)
2206 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002207 if enc.startswith("utf"):
2208 self.multi_line_test(f, enc)
2209 f.close()
2210
2211 def multi_line_test(self, f, enc):
2212 f.seek(0)
2213 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002214 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002215 wlines = []
2216 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2217 chars = []
2218 for i in range(size):
2219 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002220 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002221 wlines.append((f.tell(), line))
2222 f.write(line)
2223 f.seek(0)
2224 rlines = []
2225 while True:
2226 pos = f.tell()
2227 line = f.readline()
2228 if not line:
2229 break
2230 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002231 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002232
Antoine Pitrou19690592009-06-12 20:14:08 +00002233 def test_telling(self):
2234 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002235 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002236 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002237 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002238 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002239 p2 = f.tell()
2240 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002241 self.assertEqual(f.tell(), p0)
2242 self.assertEqual(f.readline(), "\xff\n")
2243 self.assertEqual(f.tell(), p1)
2244 self.assertEqual(f.readline(), "\xff\n")
2245 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002246 f.seek(0)
2247 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002248 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002249 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002250 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002251 f.close()
2252
Antoine Pitrou19690592009-06-12 20:14:08 +00002253 def test_seeking(self):
2254 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002255 prefix_size = chunk_size - 2
2256 u_prefix = "a" * prefix_size
2257 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002258 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002259 u_suffix = "\u8888\n"
2260 suffix = bytes(u_suffix.encode("utf-8"))
2261 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002262 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002263 f.write(line*2)
2264 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002265 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002266 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002267 self.assertEqual(s, prefix.decode("ascii"))
2268 self.assertEqual(f.tell(), prefix_size)
2269 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002270
Antoine Pitrou19690592009-06-12 20:14:08 +00002271 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002272 # Regression test for a specific bug
2273 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002274 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002275 f.write(data)
2276 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002277 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002278 f._CHUNK_SIZE # Just test that it exists
2279 f._CHUNK_SIZE = 2
2280 f.readline()
2281 f.tell()
2282
Antoine Pitrou19690592009-06-12 20:14:08 +00002283 def test_seek_and_tell(self):
2284 #Test seek/tell using the StatefulIncrementalDecoder.
2285 # Make test faster by doing smaller seeks
2286 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002287
Antoine Pitrou19690592009-06-12 20:14:08 +00002288 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002289 """Tell/seek to various points within a data stream and ensure
2290 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002291 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002292 f.write(data)
2293 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002294 f = self.open(support.TESTFN, encoding='test_decoder')
2295 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002296 decoded = f.read()
2297 f.close()
2298
2299 for i in range(min_pos, len(decoded) + 1): # seek positions
2300 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002301 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002302 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002303 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002304 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002305 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002306 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002307 f.close()
2308
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002309 # Enable the test decoder.
2310 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002311
2312 # Run the tests.
2313 try:
2314 # Try each test case.
2315 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002316 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002317
2318 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002319 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2320 offset = CHUNK_SIZE - len(input)//2
2321 prefix = b'.'*offset
2322 # Don't bother seeking into the prefix (takes too long).
2323 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002324 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002325
2326 # Ensure our test decoder won't interfere with subsequent tests.
2327 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002328 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002329
Antoine Pitrou19690592009-06-12 20:14:08 +00002330 def test_encoded_writes(self):
2331 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002332 tests = ("utf-16",
2333 "utf-16-le",
2334 "utf-16-be",
2335 "utf-32",
2336 "utf-32-le",
2337 "utf-32-be")
2338 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002339 buf = self.BytesIO()
2340 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002341 # Check if the BOM is written only once (see issue1753).
2342 f.write(data)
2343 f.write(data)
2344 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002345 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002346 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002347 self.assertEqual(f.read(), data * 2)
2348 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002349
Antoine Pitrou19690592009-06-12 20:14:08 +00002350 def test_unreadable(self):
2351 class UnReadable(self.BytesIO):
2352 def readable(self):
2353 return False
2354 txt = self.TextIOWrapper(UnReadable())
2355 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002356
Antoine Pitrou19690592009-06-12 20:14:08 +00002357 def test_read_one_by_one(self):
2358 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002359 reads = ""
2360 while True:
2361 c = txt.read(1)
2362 if not c:
2363 break
2364 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002365 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002366
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002367 def test_readlines(self):
2368 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2369 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2370 txt.seek(0)
2371 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2372 txt.seek(0)
2373 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2374
Christian Heimes1a6387e2008-03-26 12:49:49 +00002375 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002376 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002377 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002378 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002379 reads = ""
2380 while True:
2381 c = txt.read(128)
2382 if not c:
2383 break
2384 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002385 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002386
Antoine Pitroueadca1d2012-10-16 23:02:27 +02002387 def test_writelines(self):
2388 l = ['ab', 'cd', 'ef']
2389 buf = self.BytesIO()
2390 txt = self.TextIOWrapper(buf)
2391 txt.writelines(l)
2392 txt.flush()
2393 self.assertEqual(buf.getvalue(), b'abcdef')
2394
2395 def test_writelines_userlist(self):
2396 l = UserList(['ab', 'cd', 'ef'])
2397 buf = self.BytesIO()
2398 txt = self.TextIOWrapper(buf)
2399 txt.writelines(l)
2400 txt.flush()
2401 self.assertEqual(buf.getvalue(), b'abcdef')
2402
2403 def test_writelines_error(self):
2404 txt = self.TextIOWrapper(self.BytesIO())
2405 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2406 self.assertRaises(TypeError, txt.writelines, None)
2407 self.assertRaises(TypeError, txt.writelines, b'abc')
2408
Christian Heimes1a6387e2008-03-26 12:49:49 +00002409 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002410 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002411
2412 # read one char at a time
2413 reads = ""
2414 while True:
2415 c = txt.read(1)
2416 if not c:
2417 break
2418 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002419 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002420
2421 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002422 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002423 txt._CHUNK_SIZE = 4
2424
2425 reads = ""
2426 while True:
2427 c = txt.read(4)
2428 if not c:
2429 break
2430 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002431 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002432
2433 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002434 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002435 txt._CHUNK_SIZE = 4
2436
2437 reads = txt.read(4)
2438 reads += txt.read(4)
2439 reads += txt.readline()
2440 reads += txt.readline()
2441 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002442 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002443
2444 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002445 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002446 txt._CHUNK_SIZE = 4
2447
2448 reads = txt.read(4)
2449 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002450 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002451
2452 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002453 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002454 txt._CHUNK_SIZE = 4
2455
2456 reads = txt.read(4)
2457 pos = txt.tell()
2458 txt.seek(0)
2459 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002460 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002461
2462 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002463 buffer = self.BytesIO(self.testdata)
2464 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002465
2466 self.assertEqual(buffer.seekable(), txt.seekable())
2467
Antoine Pitrou19690592009-06-12 20:14:08 +00002468 def test_append_bom(self):
2469 # The BOM is not written again when appending to a non-empty file
2470 filename = support.TESTFN
2471 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2472 with self.open(filename, 'w', encoding=charset) as f:
2473 f.write('aaa')
2474 pos = f.tell()
2475 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002476 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002477
2478 with self.open(filename, 'a', encoding=charset) as f:
2479 f.write('xxx')
2480 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002481 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002482
Antoine Pitrou19690592009-06-12 20:14:08 +00002483 def test_seek_bom(self):
2484 # Same test, but when seeking manually
2485 filename = support.TESTFN
2486 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2487 with self.open(filename, 'w', encoding=charset) as f:
2488 f.write('aaa')
2489 pos = f.tell()
2490 with self.open(filename, 'r+', encoding=charset) as f:
2491 f.seek(pos)
2492 f.write('zzz')
2493 f.seek(0)
2494 f.write('bbb')
2495 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002496 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002497
2498 def test_errors_property(self):
2499 with self.open(support.TESTFN, "w") as f:
2500 self.assertEqual(f.errors, "strict")
2501 with self.open(support.TESTFN, "w", errors="replace") as f:
2502 self.assertEqual(f.errors, "replace")
2503
Victor Stinner6a102812010-04-27 23:55:59 +00002504 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002505 def test_threads_write(self):
2506 # Issue6750: concurrent writes could duplicate data
2507 event = threading.Event()
2508 with self.open(support.TESTFN, "w", buffering=1) as f:
2509 def run(n):
2510 text = "Thread%03d\n" % n
2511 event.wait()
2512 f.write(text)
2513 threads = [threading.Thread(target=lambda n=x: run(n))
2514 for x in range(20)]
2515 for t in threads:
2516 t.start()
2517 time.sleep(0.02)
2518 event.set()
2519 for t in threads:
2520 t.join()
2521 with self.open(support.TESTFN) as f:
2522 content = f.read()
2523 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002524 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002525
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002526 def test_flush_error_on_close(self):
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002527 # Test that text file is closed despite failed flush
2528 # and that flush() is called before file closed.
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002529 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002530 closed = []
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002531 def bad_flush():
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002532 closed[:] = [txt.closed, txt.buffer.closed]
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002533 raise IOError()
2534 txt.flush = bad_flush
2535 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -06002536 self.assertTrue(txt.closed)
Serhiy Storchaka3173f7c2015-02-21 00:34:20 +02002537 self.assertTrue(txt.buffer.closed)
2538 self.assertTrue(closed) # flush() called
2539 self.assertFalse(closed[0]) # flush() called before file closed
2540 self.assertFalse(closed[1])
Serhiy Storchaka437d5352015-02-23 00:28:38 +02002541 txt.flush = lambda: None # break reference loop
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002542
2543 def test_multi_close(self):
2544 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2545 txt.close()
2546 txt.close()
2547 txt.close()
2548 self.assertRaises(ValueError, txt.flush)
2549
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002550 def test_readonly_attributes(self):
2551 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2552 buf = self.BytesIO(self.testdata)
2553 with self.assertRaises((AttributeError, TypeError)):
2554 txt.buffer = buf
2555
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002556 def test_read_nonbytes(self):
2557 # Issue #17106
2558 # Crash when underlying read() returns non-bytes
2559 class NonbytesStream(self.StringIO):
2560 read1 = self.StringIO.read
2561 class NonbytesStream(self.StringIO):
2562 read1 = self.StringIO.read
2563 t = self.TextIOWrapper(NonbytesStream('a'))
2564 with self.maybeRaises(TypeError):
2565 t.read(1)
2566 t = self.TextIOWrapper(NonbytesStream('a'))
2567 with self.maybeRaises(TypeError):
2568 t.readline()
2569 t = self.TextIOWrapper(NonbytesStream('a'))
2570 self.assertEqual(t.read(), u'a')
2571
2572 def test_illegal_decoder(self):
2573 # Issue #17106
2574 # Crash when decoder returns non-string
2575 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2576 encoding='quopri_codec')
2577 with self.maybeRaises(TypeError):
2578 t.read(1)
2579 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2580 encoding='quopri_codec')
2581 with self.maybeRaises(TypeError):
2582 t.readline()
2583 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2584 encoding='quopri_codec')
2585 with self.maybeRaises(TypeError):
2586 t.read()
2587
2588
Antoine Pitrou19690592009-06-12 20:14:08 +00002589class CTextIOWrapperTest(TextIOWrapperTest):
2590
2591 def test_initialization(self):
2592 r = self.BytesIO(b"\xc3\xa9\n\n")
2593 b = self.BufferedReader(r, 1000)
2594 t = self.TextIOWrapper(b)
2595 self.assertRaises(TypeError, t.__init__, b, newline=42)
2596 self.assertRaises(ValueError, t.read)
2597 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2598 self.assertRaises(ValueError, t.read)
2599
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002600 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2601 self.assertRaises(Exception, repr, t)
2602
Antoine Pitrou19690592009-06-12 20:14:08 +00002603 def test_garbage_collection(self):
2604 # C TextIOWrapper objects are collected, and collecting them flushes
2605 # all data to disk.
2606 # The Python version has __del__, so it ends in gc.garbage instead.
2607 rawio = io.FileIO(support.TESTFN, "wb")
2608 b = self.BufferedWriter(rawio)
2609 t = self.TextIOWrapper(b, encoding="ascii")
2610 t.write("456def")
2611 t.x = t
2612 wr = weakref.ref(t)
2613 del t
2614 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002615 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002616 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002617 self.assertEqual(f.read(), b"456def")
2618
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002619 def test_rwpair_cleared_before_textio(self):
2620 # Issue 13070: TextIOWrapper's finalization would crash when called
2621 # after the reference to the underlying BufferedRWPair's writer got
2622 # cleared by the GC.
2623 for i in range(1000):
2624 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2625 t1 = self.TextIOWrapper(b1, encoding="ascii")
2626 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2627 t2 = self.TextIOWrapper(b2, encoding="ascii")
2628 # circular references
2629 t1.buddy = t2
2630 t2.buddy = t1
2631 support.gc_collect()
2632
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002633 maybeRaises = unittest.TestCase.assertRaises
2634
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002635
Antoine Pitrou19690592009-06-12 20:14:08 +00002636class PyTextIOWrapperTest(TextIOWrapperTest):
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002637 @contextlib.contextmanager
2638 def maybeRaises(self, *args, **kwds):
2639 yield
Antoine Pitrou19690592009-06-12 20:14:08 +00002640
2641
2642class IncrementalNewlineDecoderTest(unittest.TestCase):
2643
2644 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002645 # UTF-8 specific tests for a newline decoder
2646 def _check_decode(b, s, **kwargs):
2647 # We exercise getstate() / setstate() as well as decode()
2648 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002649 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002650 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002651 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002652
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002653 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002654
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002655 _check_decode(b'\xe8', "")
2656 _check_decode(b'\xa2', "")
2657 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002658
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002659 _check_decode(b'\xe8', "")
2660 _check_decode(b'\xa2', "")
2661 _check_decode(b'\x88', "\u8888")
2662
2663 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002664 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2665
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002666 decoder.reset()
2667 _check_decode(b'\n', "\n")
2668 _check_decode(b'\r', "")
2669 _check_decode(b'', "\n", final=True)
2670 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002671
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002672 _check_decode(b'\r', "")
2673 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002674
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002675 _check_decode(b'\r\r\n', "\n\n")
2676 _check_decode(b'\r', "")
2677 _check_decode(b'\r', "\n")
2678 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002679
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002680 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2681 _check_decode(b'\xe8\xa2\x88', "\u8888")
2682 _check_decode(b'\n', "\n")
2683 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2684 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002685
Antoine Pitrou19690592009-06-12 20:14:08 +00002686 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002687 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002688 if encoding is not None:
2689 encoder = codecs.getincrementalencoder(encoding)()
2690 def _decode_bytewise(s):
2691 # Decode one byte at a time
2692 for b in encoder.encode(s):
2693 result.append(decoder.decode(b))
2694 else:
2695 encoder = None
2696 def _decode_bytewise(s):
2697 # Decode one char at a time
2698 for c in s:
2699 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002700 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002701 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002702 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002703 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002704 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002705 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002706 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002707 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002708 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002709 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002710 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002711 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002712 input = "abc"
2713 if encoder is not None:
2714 encoder.reset()
2715 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002716 self.assertEqual(decoder.decode(input), "abc")
2717 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002718
2719 def test_newline_decoder(self):
2720 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002721 # None meaning the IncrementalNewlineDecoder takes unicode input
2722 # rather than bytes input
2723 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002724 'utf-16', 'utf-16-le', 'utf-16-be',
2725 'utf-32', 'utf-32-le', 'utf-32-be',
2726 )
2727 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002728 decoder = enc and codecs.getincrementaldecoder(enc)()
2729 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2730 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002731 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002732 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2733 self.check_newline_decoding_utf8(decoder)
2734
2735 def test_newline_bytes(self):
2736 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2737 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002738 self.assertEqual(dec.newlines, None)
2739 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2740 self.assertEqual(dec.newlines, None)
2741 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2742 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002743 dec = self.IncrementalNewlineDecoder(None, translate=False)
2744 _check(dec)
2745 dec = self.IncrementalNewlineDecoder(None, translate=True)
2746 _check(dec)
2747
2748class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2749 pass
2750
2751class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2752 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002753
Christian Heimes1a6387e2008-03-26 12:49:49 +00002754
2755# XXX Tests for open()
2756
2757class MiscIOTest(unittest.TestCase):
2758
Benjamin Petersonad100c32008-11-20 22:06:22 +00002759 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002760 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002761
Antoine Pitrou19690592009-06-12 20:14:08 +00002762 def test___all__(self):
2763 for name in self.io.__all__:
2764 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002765 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002766 if name == "open":
2767 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002768 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002769 self.assertTrue(issubclass(obj, Exception), name)
2770 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002771 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002772
Benjamin Petersonad100c32008-11-20 22:06:22 +00002773 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002774 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002775 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002776 f.close()
2777
Antoine Pitrou19690592009-06-12 20:14:08 +00002778 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002779 self.assertEqual(f.name, support.TESTFN)
2780 self.assertEqual(f.buffer.name, support.TESTFN)
2781 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2782 self.assertEqual(f.mode, "U")
2783 self.assertEqual(f.buffer.mode, "rb")
2784 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002785 f.close()
2786
Antoine Pitrou19690592009-06-12 20:14:08 +00002787 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002788 self.assertEqual(f.mode, "w+")
2789 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2790 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002791
Antoine Pitrou19690592009-06-12 20:14:08 +00002792 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002793 self.assertEqual(g.mode, "wb")
2794 self.assertEqual(g.raw.mode, "wb")
2795 self.assertEqual(g.name, f.fileno())
2796 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002797 f.close()
2798 g.close()
2799
Antoine Pitrou19690592009-06-12 20:14:08 +00002800 def test_io_after_close(self):
2801 for kwargs in [
2802 {"mode": "w"},
2803 {"mode": "wb"},
2804 {"mode": "w", "buffering": 1},
2805 {"mode": "w", "buffering": 2},
2806 {"mode": "wb", "buffering": 0},
2807 {"mode": "r"},
2808 {"mode": "rb"},
2809 {"mode": "r", "buffering": 1},
2810 {"mode": "r", "buffering": 2},
2811 {"mode": "rb", "buffering": 0},
2812 {"mode": "w+"},
2813 {"mode": "w+b"},
2814 {"mode": "w+", "buffering": 1},
2815 {"mode": "w+", "buffering": 2},
2816 {"mode": "w+b", "buffering": 0},
2817 ]:
2818 f = self.open(support.TESTFN, **kwargs)
2819 f.close()
2820 self.assertRaises(ValueError, f.flush)
2821 self.assertRaises(ValueError, f.fileno)
2822 self.assertRaises(ValueError, f.isatty)
2823 self.assertRaises(ValueError, f.__iter__)
2824 if hasattr(f, "peek"):
2825 self.assertRaises(ValueError, f.peek, 1)
2826 self.assertRaises(ValueError, f.read)
2827 if hasattr(f, "read1"):
2828 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002829 if hasattr(f, "readall"):
2830 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002831 if hasattr(f, "readinto"):
2832 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2833 self.assertRaises(ValueError, f.readline)
2834 self.assertRaises(ValueError, f.readlines)
2835 self.assertRaises(ValueError, f.seek, 0)
2836 self.assertRaises(ValueError, f.tell)
2837 self.assertRaises(ValueError, f.truncate)
2838 self.assertRaises(ValueError, f.write,
2839 b"" if "b" in kwargs['mode'] else "")
2840 self.assertRaises(ValueError, f.writelines, [])
2841 self.assertRaises(ValueError, next, f)
2842
2843 def test_blockingioerror(self):
2844 # Various BlockingIOError issues
2845 self.assertRaises(TypeError, self.BlockingIOError)
2846 self.assertRaises(TypeError, self.BlockingIOError, 1)
2847 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2848 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2849 b = self.BlockingIOError(1, "")
2850 self.assertEqual(b.characters_written, 0)
2851 class C(unicode):
2852 pass
2853 c = C("")
2854 b = self.BlockingIOError(1, c)
2855 c.b = b
2856 b.c = c
2857 wr = weakref.ref(c)
2858 del c, b
2859 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002860 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002861
2862 def test_abcs(self):
2863 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002864 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2865 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2866 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2867 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002868
2869 def _check_abc_inheritance(self, abcmodule):
2870 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002871 self.assertIsInstance(f, abcmodule.IOBase)
2872 self.assertIsInstance(f, abcmodule.RawIOBase)
2873 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2874 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002875 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002876 self.assertIsInstance(f, abcmodule.IOBase)
2877 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2878 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2879 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002880 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002881 self.assertIsInstance(f, abcmodule.IOBase)
2882 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2883 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2884 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002885
2886 def test_abc_inheritance(self):
2887 # Test implementations inherit from their respective ABCs
2888 self._check_abc_inheritance(self)
2889
2890 def test_abc_inheritance_official(self):
2891 # Test implementations inherit from the official ABCs of the
2892 # baseline "io" module.
2893 self._check_abc_inheritance(io)
2894
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01002895 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2896 def test_nonblock_pipe_write_bigbuf(self):
2897 self._test_nonblock_pipe_write(16*1024)
2898
2899 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2900 def test_nonblock_pipe_write_smallbuf(self):
2901 self._test_nonblock_pipe_write(1024)
2902
2903 def _set_non_blocking(self, fd):
2904 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2905 self.assertNotEqual(flags, -1)
2906 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2907 self.assertEqual(res, 0)
2908
2909 def _test_nonblock_pipe_write(self, bufsize):
2910 sent = []
2911 received = []
2912 r, w = os.pipe()
2913 self._set_non_blocking(r)
2914 self._set_non_blocking(w)
2915
2916 # To exercise all code paths in the C implementation we need
2917 # to play with buffer sizes. For instance, if we choose a
2918 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2919 # then we will never get a partial write of the buffer.
2920 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2921 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2922
2923 with rf, wf:
2924 for N in 9999, 73, 7574:
2925 try:
2926 i = 0
2927 while True:
2928 msg = bytes([i % 26 + 97]) * N
2929 sent.append(msg)
2930 wf.write(msg)
2931 i += 1
2932
2933 except self.BlockingIOError as e:
2934 self.assertEqual(e.args[0], errno.EAGAIN)
2935 sent[-1] = sent[-1][:e.characters_written]
2936 received.append(rf.read())
2937 msg = b'BLOCKED'
2938 wf.write(msg)
2939 sent.append(msg)
2940
2941 while True:
2942 try:
2943 wf.flush()
2944 break
2945 except self.BlockingIOError as e:
2946 self.assertEqual(e.args[0], errno.EAGAIN)
2947 self.assertEqual(e.characters_written, 0)
2948 received.append(rf.read())
2949
2950 received += iter(rf.read, None)
2951
2952 sent, received = b''.join(sent), b''.join(received)
2953 self.assertTrue(sent == received)
2954 self.assertTrue(wf.closed)
2955 self.assertTrue(rf.closed)
2956
Antoine Pitrou19690592009-06-12 20:14:08 +00002957class CMiscIOTest(MiscIOTest):
2958 io = io
2959
2960class PyMiscIOTest(MiscIOTest):
2961 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002962
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002963
2964@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2965class SignalsTest(unittest.TestCase):
2966
2967 def setUp(self):
2968 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2969
2970 def tearDown(self):
2971 signal.signal(signal.SIGALRM, self.oldalrm)
2972
2973 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00002974 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002975
2976 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02002977 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2978 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002979 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2980 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00002981 invokes the signal handler, and bubbles up the exception raised
2982 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002983 read_results = []
2984 def _read():
2985 s = os.read(r, 1)
2986 read_results.append(s)
2987 t = threading.Thread(target=_read)
2988 t.daemon = True
2989 r, w = os.pipe()
2990 try:
2991 wio = self.io.open(w, **fdopen_kwargs)
2992 t.start()
2993 signal.alarm(1)
2994 # Fill the pipe enough that the write will be blocking.
2995 # It will be interrupted by the timer armed above. Since the
2996 # other thread has read one byte, the low-level write will
2997 # return with a successful (partial) result rather than an EINTR.
2998 # The buffered IO layer must check for pending signal
2999 # handlers, which in this case will invoke alarm_interrupt().
3000 self.assertRaises(ZeroDivisionError,
Antoine Pitrou68915d72013-04-24 23:31:38 +02003001 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003002 t.join()
3003 # We got one byte, get another one and check that it isn't a
3004 # repeat of the first one.
3005 read_results.append(os.read(r, 1))
3006 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3007 finally:
3008 os.close(w)
3009 os.close(r)
3010 # This is deliberate. If we didn't close the file descriptor
3011 # before closing wio, wio would try to flush its internal
3012 # buffer, and block again.
3013 try:
3014 wio.close()
3015 except IOError as e:
3016 if e.errno != errno.EBADF:
3017 raise
3018
3019 def test_interrupted_write_unbuffered(self):
3020 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3021
3022 def test_interrupted_write_buffered(self):
3023 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3024
3025 def test_interrupted_write_text(self):
3026 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3027
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003028 def check_reentrant_write(self, data, **fdopen_kwargs):
3029 def on_alarm(*args):
3030 # Will be called reentrantly from the same thread
3031 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02003032 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003033 signal.signal(signal.SIGALRM, on_alarm)
3034 r, w = os.pipe()
3035 wio = self.io.open(w, **fdopen_kwargs)
3036 try:
3037 signal.alarm(1)
3038 # Either the reentrant call to wio.write() fails with RuntimeError,
3039 # or the signal handler raises ZeroDivisionError.
3040 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3041 while 1:
3042 for i in range(100):
3043 wio.write(data)
3044 wio.flush()
3045 # Make sure the buffer doesn't fill up and block further writes
3046 os.read(r, len(data) * 100)
3047 exc = cm.exception
3048 if isinstance(exc, RuntimeError):
3049 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3050 finally:
3051 wio.close()
3052 os.close(r)
3053
3054 def test_reentrant_write_buffered(self):
3055 self.check_reentrant_write(b"xy", mode="wb")
3056
3057 def test_reentrant_write_text(self):
3058 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3059
Antoine Pitrou6439c002011-02-25 21:35:47 +00003060 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3061 """Check that a buffered read, when it gets interrupted (either
3062 returning a partial result or EINTR), properly invokes the signal
3063 handler and retries if the latter returned successfully."""
3064 r, w = os.pipe()
3065 fdopen_kwargs["closefd"] = False
3066 def alarm_handler(sig, frame):
3067 os.write(w, b"bar")
3068 signal.signal(signal.SIGALRM, alarm_handler)
3069 try:
3070 rio = self.io.open(r, **fdopen_kwargs)
3071 os.write(w, b"foo")
3072 signal.alarm(1)
3073 # Expected behaviour:
3074 # - first raw read() returns partial b"foo"
3075 # - second raw read() returns EINTR
3076 # - third raw read() returns b"bar"
3077 self.assertEqual(decode(rio.read(6)), "foobar")
3078 finally:
3079 rio.close()
3080 os.close(w)
3081 os.close(r)
3082
3083 def test_interrupterd_read_retry_buffered(self):
3084 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3085 mode="rb")
3086
3087 def test_interrupterd_read_retry_text(self):
3088 self.check_interrupted_read_retry(lambda x: x,
3089 mode="r")
3090
3091 @unittest.skipUnless(threading, 'Threading required for this test.')
3092 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3093 """Check that a buffered write, when it gets interrupted (either
3094 returning a partial result or EINTR), properly invokes the signal
3095 handler and retries if the latter returned successfully."""
3096 select = support.import_module("select")
3097 # A quantity that exceeds the buffer size of an anonymous pipe's
3098 # write end.
Antoine Pitrou68915d72013-04-24 23:31:38 +02003099 N = support.PIPE_MAX_SIZE
Antoine Pitrou6439c002011-02-25 21:35:47 +00003100 r, w = os.pipe()
3101 fdopen_kwargs["closefd"] = False
3102 # We need a separate thread to read from the pipe and allow the
3103 # write() to finish. This thread is started after the SIGALRM is
3104 # received (forcing a first EINTR in write()).
3105 read_results = []
3106 write_finished = False
3107 def _read():
3108 while not write_finished:
3109 while r in select.select([r], [], [], 1.0)[0]:
3110 s = os.read(r, 1024)
3111 read_results.append(s)
3112 t = threading.Thread(target=_read)
3113 t.daemon = True
3114 def alarm1(sig, frame):
3115 signal.signal(signal.SIGALRM, alarm2)
3116 signal.alarm(1)
3117 def alarm2(sig, frame):
3118 t.start()
3119 signal.signal(signal.SIGALRM, alarm1)
3120 try:
3121 wio = self.io.open(w, **fdopen_kwargs)
3122 signal.alarm(1)
3123 # Expected behaviour:
3124 # - first raw write() is partial (because of the limited pipe buffer
3125 # and the first alarm)
3126 # - second raw write() returns EINTR (because of the second alarm)
3127 # - subsequent write()s are successful (either partial or complete)
3128 self.assertEqual(N, wio.write(item * N))
3129 wio.flush()
3130 write_finished = True
3131 t.join()
3132 self.assertEqual(N, sum(len(x) for x in read_results))
3133 finally:
3134 write_finished = True
3135 os.close(w)
3136 os.close(r)
3137 # This is deliberate. If we didn't close the file descriptor
3138 # before closing wio, wio would try to flush its internal
3139 # buffer, and could block (in case of failure).
3140 try:
3141 wio.close()
3142 except IOError as e:
3143 if e.errno != errno.EBADF:
3144 raise
3145
3146 def test_interrupterd_write_retry_buffered(self):
3147 self.check_interrupted_write_retry(b"x", mode="wb")
3148
3149 def test_interrupterd_write_retry_text(self):
3150 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3151
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003152
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003153class CSignalsTest(SignalsTest):
3154 io = io
3155
3156class PySignalsTest(SignalsTest):
3157 io = pyio
3158
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003159 # Handling reentrancy issues would slow down _pyio even more, so the
3160 # tests are disabled.
3161 test_reentrant_write_buffered = None
3162 test_reentrant_write_text = None
3163
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003164
Christian Heimes1a6387e2008-03-26 12:49:49 +00003165def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00003166 tests = (CIOTest, PyIOTest,
3167 CBufferedReaderTest, PyBufferedReaderTest,
3168 CBufferedWriterTest, PyBufferedWriterTest,
3169 CBufferedRWPairTest, PyBufferedRWPairTest,
3170 CBufferedRandomTest, PyBufferedRandomTest,
3171 StatefulIncrementalDecoderTest,
3172 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3173 CTextIOWrapperTest, PyTextIOWrapperTest,
3174 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003175 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00003176 )
3177
3178 # Put the namespaces of the IO module we are testing and some useful mock
3179 # classes in the __dict__ of each test.
3180 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00003181 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00003182 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3183 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3184 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3185 globs = globals()
3186 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3187 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3188 # Avoid turning open into a bound method.
3189 py_io_ns["open"] = pyio.OpenWrapper
3190 for test in tests:
3191 if test.__name__.startswith("C"):
3192 for name, obj in c_io_ns.items():
3193 setattr(test, name, obj)
3194 elif test.__name__.startswith("Py"):
3195 for name, obj in py_io_ns.items():
3196 setattr(test, name, obj)
3197
3198 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00003199
3200if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00003201 test_main()