blob: ba3701a4682769c0d0baa372e48ab807966d8892 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +030032import warnings
Antoine Pitrou19690592009-06-12 20:14:08 +000033import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000034import signal
35import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000036from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000037from collections import deque
Antoine Pitrou78e761e2012-10-16 22:57:11 +020038from UserList import UserList
Antoine Pitrou19690592009-06-12 20:14:08 +000039from test import test_support as support
Serhiy Storchaka354d50e2013-02-03 17:10:42 +020040import contextlib
Christian Heimes1a6387e2008-03-26 12:49:49 +000041
42import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000043import io # C implementation of io
44import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000045try:
46 import threading
47except ImportError:
48 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010049try:
50 import fcntl
51except ImportError:
52 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000053
54__metaclass__ = type
55bytes = support.py3k_bytes
56
57def _default_chunk_size():
58 """Get the default TextIOWrapper chunk size"""
59 with io.open(__file__, "r", encoding="latin1") as f:
60 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000061
62
Antoine Pitrou6391b342010-09-14 18:48:19 +000063class MockRawIOWithoutRead:
64 """A RawIO implementation without read(), so as to exercise the default
65 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000066
67 def __init__(self, read_stack=()):
68 self._read_stack = list(read_stack)
69 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000070 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000071 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000072
Christian Heimes1a6387e2008-03-26 12:49:49 +000073 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000074 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000075 return len(b)
76
77 def writable(self):
78 return True
79
80 def fileno(self):
81 return 42
82
83 def readable(self):
84 return True
85
86 def seekable(self):
87 return True
88
89 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000090 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000091
92 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000093 return 0 # same comment as above
94
95 def readinto(self, buf):
96 self._reads += 1
97 max_len = len(buf)
98 try:
99 data = self._read_stack[0]
100 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000101 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +0000102 return 0
103 if data is None:
104 del self._read_stack[0]
105 return None
106 n = len(data)
107 if len(data) <= max_len:
108 del self._read_stack[0]
109 buf[:n] = data
110 return n
111 else:
112 buf[:] = data[:max_len]
113 self._read_stack[0] = data[max_len:]
114 return max_len
115
116 def truncate(self, pos=None):
117 return pos
118
Antoine Pitrou6391b342010-09-14 18:48:19 +0000119class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
120 pass
121
122class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
123 pass
124
125
126class MockRawIO(MockRawIOWithoutRead):
127
128 def read(self, n=None):
129 self._reads += 1
130 try:
131 return self._read_stack.pop(0)
132 except:
133 self._extraneous_reads += 1
134 return b""
135
Antoine Pitrou19690592009-06-12 20:14:08 +0000136class CMockRawIO(MockRawIO, io.RawIOBase):
137 pass
138
139class PyMockRawIO(MockRawIO, pyio.RawIOBase):
140 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000141
142
Antoine Pitrou19690592009-06-12 20:14:08 +0000143class MisbehavedRawIO(MockRawIO):
144 def write(self, b):
145 return MockRawIO.write(self, b) * 2
146
147 def read(self, n=None):
148 return MockRawIO.read(self, n) * 2
149
150 def seek(self, pos, whence):
151 return -123
152
153 def tell(self):
154 return -456
155
156 def readinto(self, buf):
157 MockRawIO.readinto(self, buf)
158 return len(buf) * 5
159
160class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
161 pass
162
163class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
164 pass
165
166
167class CloseFailureIO(MockRawIO):
168 closed = 0
169
170 def close(self):
171 if not self.closed:
172 self.closed = 1
173 raise IOError
174
175class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
176 pass
177
178class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
179 pass
180
181
182class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000183
184 def __init__(self, data):
185 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000187
188 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000189 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190 self.read_history.append(None if res is None else len(res))
191 return res
192
Antoine Pitrou19690592009-06-12 20:14:08 +0000193 def readinto(self, b):
194 res = super(MockFileIO, self).readinto(b)
195 self.read_history.append(res)
196 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000197
Antoine Pitrou19690592009-06-12 20:14:08 +0000198class CMockFileIO(MockFileIO, io.BytesIO):
199 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000200
Antoine Pitrou19690592009-06-12 20:14:08 +0000201class PyMockFileIO(MockFileIO, pyio.BytesIO):
202 pass
203
204
205class MockNonBlockWriterIO:
206
207 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000208 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000209 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000210
Antoine Pitrou19690592009-06-12 20:14:08 +0000211 def pop_written(self):
212 s = b"".join(self._write_stack)
213 self._write_stack[:] = []
214 return s
215
216 def block_on(self, char):
217 """Block when a given char is encountered."""
218 self._blocker_char = char
219
220 def readable(self):
221 return True
222
223 def seekable(self):
224 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000225
226 def writable(self):
227 return True
228
Antoine Pitrou19690592009-06-12 20:14:08 +0000229 def write(self, b):
230 b = bytes(b)
231 n = -1
232 if self._blocker_char:
233 try:
234 n = b.index(self._blocker_char)
235 except ValueError:
236 pass
237 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100238 if n > 0:
239 # write data up to the first blocker
240 self._write_stack.append(b[:n])
241 return n
242 else:
243 # cancel blocker and indicate would block
244 self._blocker_char = None
245 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000246 self._write_stack.append(b)
247 return len(b)
248
249class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
250 BlockingIOError = io.BlockingIOError
251
252class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
253 BlockingIOError = pyio.BlockingIOError
254
Christian Heimes1a6387e2008-03-26 12:49:49 +0000255
256class IOTest(unittest.TestCase):
257
Antoine Pitrou19690592009-06-12 20:14:08 +0000258 def setUp(self):
259 support.unlink(support.TESTFN)
260
Christian Heimes1a6387e2008-03-26 12:49:49 +0000261 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000262 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000263
264 def write_ops(self, f):
265 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000266 f.truncate(0)
267 self.assertEqual(f.tell(), 5)
268 f.seek(0)
269
270 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000271 self.assertEqual(f.seek(0), 0)
272 self.assertEqual(f.write(b"Hello."), 6)
273 self.assertEqual(f.tell(), 6)
274 self.assertEqual(f.seek(-1, 1), 5)
275 self.assertEqual(f.tell(), 5)
276 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
277 self.assertEqual(f.seek(0), 0)
278 self.assertEqual(f.write(b"h"), 1)
279 self.assertEqual(f.seek(-1, 2), 13)
280 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000281
Christian Heimes1a6387e2008-03-26 12:49:49 +0000282 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000283 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000284 self.assertRaises(TypeError, f.seek, 0.0)
285
286 def read_ops(self, f, buffered=False):
287 data = f.read(5)
288 self.assertEqual(data, b"hello")
289 data = bytearray(data)
290 self.assertEqual(f.readinto(data), 5)
291 self.assertEqual(data, b" worl")
292 self.assertEqual(f.readinto(data), 2)
293 self.assertEqual(len(data), 5)
294 self.assertEqual(data[:2], b"d\n")
295 self.assertEqual(f.seek(0), 0)
296 self.assertEqual(f.read(20), b"hello world\n")
297 self.assertEqual(f.read(1), b"")
298 self.assertEqual(f.readinto(bytearray(b"x")), 0)
299 self.assertEqual(f.seek(-6, 2), 6)
300 self.assertEqual(f.read(5), b"world")
301 self.assertEqual(f.read(0), b"")
302 self.assertEqual(f.readinto(bytearray()), 0)
303 self.assertEqual(f.seek(-6, 1), 5)
304 self.assertEqual(f.read(5), b" worl")
305 self.assertEqual(f.tell(), 10)
306 self.assertRaises(TypeError, f.seek, 0.0)
307 if buffered:
308 f.seek(0)
309 self.assertEqual(f.read(), b"hello world\n")
310 f.seek(6)
311 self.assertEqual(f.read(), b"world\n")
312 self.assertEqual(f.read(), b"")
313
314 LARGE = 2**31
315
316 def large_file_ops(self, f):
317 assert f.readable()
318 assert f.writable()
319 self.assertEqual(f.seek(self.LARGE), self.LARGE)
320 self.assertEqual(f.tell(), self.LARGE)
321 self.assertEqual(f.write(b"xxx"), 3)
322 self.assertEqual(f.tell(), self.LARGE + 3)
323 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
324 self.assertEqual(f.truncate(), self.LARGE + 2)
325 self.assertEqual(f.tell(), self.LARGE + 2)
326 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
327 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000328 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000329 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
330 self.assertEqual(f.seek(-1, 2), self.LARGE)
331 self.assertEqual(f.read(2), b"x")
332
Antoine Pitrou19690592009-06-12 20:14:08 +0000333 def test_invalid_operations(self):
334 # Try writing on a file opened in read mode and vice-versa.
335 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000336 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000337 self.assertRaises(IOError, fp.read)
338 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000339 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000340 self.assertRaises(IOError, fp.write, b"blah")
341 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000342 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000343 self.assertRaises(IOError, fp.write, "blah")
344 self.assertRaises(IOError, fp.writelines, ["blah\n"])
345
Christian Heimes1a6387e2008-03-26 12:49:49 +0000346 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000347 with self.open(support.TESTFN, "wb", buffering=0) as f:
348 self.assertEqual(f.readable(), False)
349 self.assertEqual(f.writable(), True)
350 self.assertEqual(f.seekable(), True)
351 self.write_ops(f)
352 with self.open(support.TESTFN, "rb", buffering=0) as f:
353 self.assertEqual(f.readable(), True)
354 self.assertEqual(f.writable(), False)
355 self.assertEqual(f.seekable(), True)
356 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000357
358 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000359 with self.open(support.TESTFN, "wb") as f:
360 self.assertEqual(f.readable(), False)
361 self.assertEqual(f.writable(), True)
362 self.assertEqual(f.seekable(), True)
363 self.write_ops(f)
364 with self.open(support.TESTFN, "rb") as f:
365 self.assertEqual(f.readable(), True)
366 self.assertEqual(f.writable(), False)
367 self.assertEqual(f.seekable(), True)
368 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000369
370 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000371 with self.open(support.TESTFN, "wb") as f:
372 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
373 with self.open(support.TESTFN, "rb") as f:
374 self.assertEqual(f.readline(), b"abc\n")
375 self.assertEqual(f.readline(10), b"def\n")
376 self.assertEqual(f.readline(2), b"xy")
377 self.assertEqual(f.readline(4), b"zzy\n")
378 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000379 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000380 self.assertRaises(TypeError, f.readline, 5.3)
381 with self.open(support.TESTFN, "r") as f:
382 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000383
384 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000385 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000386 self.write_ops(f)
387 data = f.getvalue()
388 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000389 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000390 self.read_ops(f, True)
391
392 def test_large_file_ops(self):
393 # On Windows and Mac OSX this test comsumes large resources; It takes
394 # a long time to build the >2GB file and takes >2GB of disk space
395 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000396 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware1f702212013-12-10 14:09:20 -0600397 support.requires(
398 'largefile',
399 'test requires %s bytes and a long time to run' % self.LARGE)
Antoine Pitrou19690592009-06-12 20:14:08 +0000400 with self.open(support.TESTFN, "w+b", 0) as f:
401 self.large_file_ops(f)
402 with self.open(support.TESTFN, "w+b") as f:
403 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000404
405 def test_with_open(self):
406 for bufsize in (0, 1, 100):
407 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000408 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000409 f.write(b"xxx")
410 self.assertEqual(f.closed, True)
411 f = None
412 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000413 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000414 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000415 except ZeroDivisionError:
416 self.assertEqual(f.closed, True)
417 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000418 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000419
Antoine Pitroue741cc62009-01-21 00:45:36 +0000420 # issue 5008
421 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000422 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000423 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000424 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000425 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000427 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000428 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000429 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000430
Christian Heimes1a6387e2008-03-26 12:49:49 +0000431 def test_destructor(self):
432 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000433 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000434 def __del__(self):
435 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000436 try:
437 f = super(MyFileIO, self).__del__
438 except AttributeError:
439 pass
440 else:
441 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000442 def close(self):
443 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000444 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000445 def flush(self):
446 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000447 super(MyFileIO, self).flush()
448 f = MyFileIO(support.TESTFN, "wb")
449 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000450 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000451 support.gc_collect()
452 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000453 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000454 self.assertEqual(f.read(), b"xxx")
455
456 def _check_base_destructor(self, base):
457 record = []
458 class MyIO(base):
459 def __init__(self):
460 # This exercises the availability of attributes on object
461 # destruction.
462 # (in the C version, close() is called by the tp_dealloc
463 # function, not by __del__)
464 self.on_del = 1
465 self.on_close = 2
466 self.on_flush = 3
467 def __del__(self):
468 record.append(self.on_del)
469 try:
470 f = super(MyIO, self).__del__
471 except AttributeError:
472 pass
473 else:
474 f()
475 def close(self):
476 record.append(self.on_close)
477 super(MyIO, self).close()
478 def flush(self):
479 record.append(self.on_flush)
480 super(MyIO, self).flush()
481 f = MyIO()
482 del f
483 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000484 self.assertEqual(record, [1, 2, 3])
485
Antoine Pitrou19690592009-06-12 20:14:08 +0000486 def test_IOBase_destructor(self):
487 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000488
Antoine Pitrou19690592009-06-12 20:14:08 +0000489 def test_RawIOBase_destructor(self):
490 self._check_base_destructor(self.RawIOBase)
491
492 def test_BufferedIOBase_destructor(self):
493 self._check_base_destructor(self.BufferedIOBase)
494
495 def test_TextIOBase_destructor(self):
496 self._check_base_destructor(self.TextIOBase)
497
498 def test_close_flushes(self):
499 with self.open(support.TESTFN, "wb") as f:
500 f.write(b"xxx")
501 with self.open(support.TESTFN, "rb") as f:
502 self.assertEqual(f.read(), b"xxx")
503
504 def test_array_writes(self):
505 a = array.array(b'i', range(10))
506 n = len(a.tostring())
507 with self.open(support.TESTFN, "wb", 0) as f:
508 self.assertEqual(f.write(a), n)
509 with self.open(support.TESTFN, "wb") as f:
510 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000511
512 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000513 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000514 closefd=False)
515
Antoine Pitrou19690592009-06-12 20:14:08 +0000516 def test_read_closed(self):
517 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000518 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000519 with self.open(support.TESTFN, "r") as f:
520 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000521 self.assertEqual(file.read(), "egg\n")
522 file.seek(0)
523 file.close()
524 self.assertRaises(ValueError, file.read)
525
526 def test_no_closefd_with_filename(self):
527 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000528 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000529
530 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000532 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000533 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000534 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000535 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000536 self.assertEqual(file.buffer.raw.closefd, False)
537
Antoine Pitrou19690592009-06-12 20:14:08 +0000538 def test_garbage_collection(self):
539 # FileIO objects are collected, and collecting them flushes
540 # all data to disk.
541 f = self.FileIO(support.TESTFN, "wb")
542 f.write(b"abcxxx")
543 f.f = f
544 wr = weakref.ref(f)
545 del f
546 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000547 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000548 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000549 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000550
Antoine Pitrou19690592009-06-12 20:14:08 +0000551 def test_unbounded_file(self):
552 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
553 zero = "/dev/zero"
554 if not os.path.exists(zero):
555 self.skipTest("{0} does not exist".format(zero))
556 if sys.maxsize > 0x7FFFFFFF:
557 self.skipTest("test can only run in a 32-bit address space")
558 if support.real_max_memuse < support._2G:
559 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000560 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000561 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000562 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000563 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000564 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000565 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000566
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000567 def test_flush_error_on_close(self):
568 f = self.open(support.TESTFN, "wb", buffering=0)
569 def bad_flush():
570 raise IOError()
571 f.flush = bad_flush
572 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600573 self.assertTrue(f.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000574
575 def test_multi_close(self):
576 f = self.open(support.TESTFN, "wb", buffering=0)
577 f.close()
578 f.close()
579 f.close()
580 self.assertRaises(ValueError, f.flush)
581
Antoine Pitrou6391b342010-09-14 18:48:19 +0000582 def test_RawIOBase_read(self):
583 # Exercise the default RawIOBase.read() implementation (which calls
584 # readinto() internally).
585 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
586 self.assertEqual(rawio.read(2), b"ab")
587 self.assertEqual(rawio.read(2), b"c")
588 self.assertEqual(rawio.read(2), b"d")
589 self.assertEqual(rawio.read(2), None)
590 self.assertEqual(rawio.read(2), b"ef")
591 self.assertEqual(rawio.read(2), b"g")
592 self.assertEqual(rawio.read(2), None)
593 self.assertEqual(rawio.read(2), b"")
594
Hynek Schlawack877effc2012-05-25 09:24:18 +0200595 def test_fileio_closefd(self):
596 # Issue #4841
597 with self.open(__file__, 'rb') as f1, \
598 self.open(__file__, 'rb') as f2:
599 fileio = self.FileIO(f1.fileno(), closefd=False)
600 # .__init__() must not close f1
601 fileio.__init__(f2.fileno(), closefd=False)
602 f1.readline()
603 # .close() must not close f2
604 fileio.close()
605 f2.readline()
606
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +0300607 def test_nonbuffered_textio(self):
608 with warnings.catch_warnings(record=True) as recorded:
609 with self.assertRaises(ValueError):
610 self.open(support.TESTFN, 'w', buffering=0)
611 support.gc_collect()
612 self.assertEqual(recorded, [])
613
614 def test_invalid_newline(self):
615 with warnings.catch_warnings(record=True) as recorded:
616 with self.assertRaises(ValueError):
617 self.open(support.TESTFN, 'w', newline='invalid')
618 support.gc_collect()
619 self.assertEqual(recorded, [])
620
Hynek Schlawack877effc2012-05-25 09:24:18 +0200621
Antoine Pitrou19690592009-06-12 20:14:08 +0000622class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200623
624 def test_IOBase_finalize(self):
625 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
626 # class which inherits IOBase and an object of this class are caught
627 # in a reference cycle and close() is already in the method cache.
628 class MyIO(self.IOBase):
629 def close(self):
630 pass
631
632 # create an instance to populate the method cache
633 MyIO()
634 obj = MyIO()
635 obj.obj = obj
636 wr = weakref.ref(obj)
637 del MyIO
638 del obj
639 support.gc_collect()
640 self.assertTrue(wr() is None, wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000641
Antoine Pitrou19690592009-06-12 20:14:08 +0000642class PyIOTest(IOTest):
643 test_array_writes = unittest.skip(
644 "len(array.array) returns number of elements rather than bytelength"
645 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000646
647
Antoine Pitrou19690592009-06-12 20:14:08 +0000648class CommonBufferedTests:
649 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
650
651 def test_detach(self):
652 raw = self.MockRawIO()
653 buf = self.tp(raw)
654 self.assertIs(buf.detach(), raw)
655 self.assertRaises(ValueError, buf.detach)
656
Benjamin Peterson53ae6142014-12-21 20:51:50 -0600657 repr(buf) # Should still work
658
Antoine Pitrou19690592009-06-12 20:14:08 +0000659 def test_fileno(self):
660 rawio = self.MockRawIO()
661 bufio = self.tp(rawio)
662
Ezio Melotti2623a372010-11-21 13:34:58 +0000663 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000664
Zachary Ware1f702212013-12-10 14:09:20 -0600665 @unittest.skip('test having existential crisis')
Antoine Pitrou19690592009-06-12 20:14:08 +0000666 def test_no_fileno(self):
667 # XXX will we always have fileno() function? If so, kill
668 # this test. Else, write it.
669 pass
670
671 def test_invalid_args(self):
672 rawio = self.MockRawIO()
673 bufio = self.tp(rawio)
674 # Invalid whence
675 self.assertRaises(ValueError, bufio.seek, 0, -1)
676 self.assertRaises(ValueError, bufio.seek, 0, 3)
677
678 def test_override_destructor(self):
679 tp = self.tp
680 record = []
681 class MyBufferedIO(tp):
682 def __del__(self):
683 record.append(1)
684 try:
685 f = super(MyBufferedIO, self).__del__
686 except AttributeError:
687 pass
688 else:
689 f()
690 def close(self):
691 record.append(2)
692 super(MyBufferedIO, self).close()
693 def flush(self):
694 record.append(3)
695 super(MyBufferedIO, self).flush()
696 rawio = self.MockRawIO()
697 bufio = MyBufferedIO(rawio)
698 writable = bufio.writable()
699 del bufio
700 support.gc_collect()
701 if writable:
702 self.assertEqual(record, [1, 2, 3])
703 else:
704 self.assertEqual(record, [1, 2])
705
706 def test_context_manager(self):
707 # Test usability as a context manager
708 rawio = self.MockRawIO()
709 bufio = self.tp(rawio)
710 def _with():
711 with bufio:
712 pass
713 _with()
714 # bufio should now be closed, and using it a second time should raise
715 # a ValueError.
716 self.assertRaises(ValueError, _with)
717
718 def test_error_through_destructor(self):
719 # Test that the exception state is not modified by a destructor,
720 # even if close() fails.
721 rawio = self.CloseFailureIO()
722 def f():
723 self.tp(rawio).xyzzy
724 with support.captured_output("stderr") as s:
725 self.assertRaises(AttributeError, f)
726 s = s.getvalue().strip()
727 if s:
728 # The destructor *may* have printed an unraisable error, check it
729 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000730 self.assertTrue(s.startswith("Exception IOError: "), s)
731 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000732
733 def test_repr(self):
734 raw = self.MockRawIO()
735 b = self.tp(raw)
736 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
737 self.assertEqual(repr(b), "<%s>" % clsname)
738 raw.name = "dummy"
739 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
740 raw.name = b"dummy"
741 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000742
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000743 def test_flush_error_on_close(self):
744 raw = self.MockRawIO()
745 def bad_flush():
746 raise IOError()
747 raw.flush = bad_flush
748 b = self.tp(raw)
749 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600750 self.assertTrue(b.closed)
751
752 def test_close_error_on_close(self):
753 raw = self.MockRawIO()
754 def bad_flush():
755 raise IOError('flush')
756 def bad_close():
757 raise IOError('close')
758 raw.close = bad_close
759 b = self.tp(raw)
760 b.flush = bad_flush
761 with self.assertRaises(IOError) as err: # exception not swallowed
762 b.close()
763 self.assertEqual(err.exception.args, ('close',))
764 self.assertFalse(b.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000765
766 def test_multi_close(self):
767 raw = self.MockRawIO()
768 b = self.tp(raw)
769 b.close()
770 b.close()
771 b.close()
772 self.assertRaises(ValueError, b.flush)
773
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000774 def test_readonly_attributes(self):
775 raw = self.MockRawIO()
776 buf = self.tp(raw)
777 x = self.MockRawIO()
778 with self.assertRaises((AttributeError, TypeError)):
779 buf.raw = x
780
Christian Heimes1a6387e2008-03-26 12:49:49 +0000781
Antoine Pitroubff5df02012-07-29 19:02:46 +0200782class SizeofTest:
783
784 @support.cpython_only
785 def test_sizeof(self):
786 bufsize1 = 4096
787 bufsize2 = 8192
788 rawio = self.MockRawIO()
789 bufio = self.tp(rawio, buffer_size=bufsize1)
790 size = sys.getsizeof(bufio) - bufsize1
791 rawio = self.MockRawIO()
792 bufio = self.tp(rawio, buffer_size=bufsize2)
793 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
794
795
Antoine Pitrou19690592009-06-12 20:14:08 +0000796class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
797 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000798
Antoine Pitrou19690592009-06-12 20:14:08 +0000799 def test_constructor(self):
800 rawio = self.MockRawIO([b"abc"])
801 bufio = self.tp(rawio)
802 bufio.__init__(rawio)
803 bufio.__init__(rawio, buffer_size=1024)
804 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000805 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000806 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
807 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
808 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
809 rawio = self.MockRawIO([b"abc"])
810 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000811 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000812
Serhiy Storchaka1d19f972014-02-12 10:52:07 +0200813 def test_uninitialized(self):
814 bufio = self.tp.__new__(self.tp)
815 del bufio
816 bufio = self.tp.__new__(self.tp)
817 self.assertRaisesRegexp((ValueError, AttributeError),
818 'uninitialized|has no attribute',
819 bufio.read, 0)
820 bufio.__init__(self.MockRawIO())
821 self.assertEqual(bufio.read(0), b'')
822
Antoine Pitrou19690592009-06-12 20:14:08 +0000823 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000824 for arg in (None, 7):
825 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
826 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000827 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000828 # Invalid args
829 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000830
Antoine Pitrou19690592009-06-12 20:14:08 +0000831 def test_read1(self):
832 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
833 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000834 self.assertEqual(b"a", bufio.read(1))
835 self.assertEqual(b"b", bufio.read1(1))
836 self.assertEqual(rawio._reads, 1)
837 self.assertEqual(b"c", bufio.read1(100))
838 self.assertEqual(rawio._reads, 1)
839 self.assertEqual(b"d", bufio.read1(100))
840 self.assertEqual(rawio._reads, 2)
841 self.assertEqual(b"efg", bufio.read1(100))
842 self.assertEqual(rawio._reads, 3)
843 self.assertEqual(b"", bufio.read1(100))
844 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000845 # Invalid args
846 self.assertRaises(ValueError, bufio.read1, -1)
847
848 def test_readinto(self):
849 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
850 bufio = self.tp(rawio)
851 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000852 self.assertEqual(bufio.readinto(b), 2)
853 self.assertEqual(b, b"ab")
854 self.assertEqual(bufio.readinto(b), 2)
855 self.assertEqual(b, b"cd")
856 self.assertEqual(bufio.readinto(b), 2)
857 self.assertEqual(b, b"ef")
858 self.assertEqual(bufio.readinto(b), 1)
859 self.assertEqual(b, b"gf")
860 self.assertEqual(bufio.readinto(b), 0)
861 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000862
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000863 def test_readlines(self):
864 def bufio():
865 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
866 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000867 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
868 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
869 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000870
Antoine Pitrou19690592009-06-12 20:14:08 +0000871 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000872 data = b"abcdefghi"
873 dlen = len(data)
874
875 tests = [
876 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
877 [ 100, [ 3, 3, 3], [ dlen ] ],
878 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
879 ]
880
881 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000882 rawio = self.MockFileIO(data)
883 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000884 pos = 0
885 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000886 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000887 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000888 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000889 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000890
Antoine Pitrou19690592009-06-12 20:14:08 +0000891 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000892 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000893 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
894 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000895 self.assertEqual(b"abcd", bufio.read(6))
896 self.assertEqual(b"e", bufio.read(1))
897 self.assertEqual(b"fg", bufio.read())
898 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200899 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000900 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000901
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200902 rawio = self.MockRawIO((b"a", None, None))
903 self.assertEqual(b"a", rawio.readall())
904 self.assertIsNone(rawio.readall())
905
Antoine Pitrou19690592009-06-12 20:14:08 +0000906 def test_read_past_eof(self):
907 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
908 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000909
Ezio Melotti2623a372010-11-21 13:34:58 +0000910 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000911
Antoine Pitrou19690592009-06-12 20:14:08 +0000912 def test_read_all(self):
913 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
914 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000915
Ezio Melotti2623a372010-11-21 13:34:58 +0000916 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000917
Victor Stinner6a102812010-04-27 23:55:59 +0000918 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000919 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000920 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000921 try:
922 # Write out many bytes with exactly the same number of 0's,
923 # 1's... 255's. This will help us check that concurrent reading
924 # doesn't duplicate or forget contents.
925 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000926 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000927 random.shuffle(l)
928 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000929 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000930 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000931 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000932 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000933 errors = []
934 results = []
935 def f():
936 try:
937 # Intra-buffer read then buffer-flushing read
938 for n in cycle([1, 19]):
939 s = bufio.read(n)
940 if not s:
941 break
942 # list.append() is atomic
943 results.append(s)
944 except Exception as e:
945 errors.append(e)
946 raise
947 threads = [threading.Thread(target=f) for x in range(20)]
948 for t in threads:
949 t.start()
950 time.sleep(0.02) # yield
951 for t in threads:
952 t.join()
953 self.assertFalse(errors,
954 "the following exceptions were caught: %r" % errors)
955 s = b''.join(results)
956 for i in range(256):
957 c = bytes(bytearray([i]))
958 self.assertEqual(s.count(c), N)
959 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000960 support.unlink(support.TESTFN)
961
962 def test_misbehaved_io(self):
963 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
964 bufio = self.tp(rawio)
965 self.assertRaises(IOError, bufio.seek, 0)
966 self.assertRaises(IOError, bufio.tell)
967
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000968 def test_no_extraneous_read(self):
969 # Issue #9550; when the raw IO object has satisfied the read request,
970 # we should not issue any additional reads, otherwise it may block
971 # (e.g. socket).
972 bufsize = 16
973 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
974 rawio = self.MockRawIO([b"x" * n])
975 bufio = self.tp(rawio, bufsize)
976 self.assertEqual(bufio.read(n), b"x" * n)
977 # Simple case: one raw read is enough to satisfy the request.
978 self.assertEqual(rawio._extraneous_reads, 0,
979 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
980 # A more complex case where two raw reads are needed to satisfy
981 # the request.
982 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
983 bufio = self.tp(rawio, bufsize)
984 self.assertEqual(bufio.read(n), b"x" * n)
985 self.assertEqual(rawio._extraneous_reads, 0,
986 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
987
988
Antoine Pitroubff5df02012-07-29 19:02:46 +0200989class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +0000990 tp = io.BufferedReader
991
992 def test_constructor(self):
993 BufferedReaderTest.test_constructor(self)
994 # The allocation can succeed on 32-bit builds, e.g. with more
995 # than 2GB RAM and a 64-bit kernel.
996 if sys.maxsize > 0x7FFFFFFF:
997 rawio = self.MockRawIO()
998 bufio = self.tp(rawio)
999 self.assertRaises((OverflowError, MemoryError, ValueError),
1000 bufio.__init__, rawio, sys.maxsize)
1001
1002 def test_initialization(self):
1003 rawio = self.MockRawIO([b"abc"])
1004 bufio = self.tp(rawio)
1005 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1006 self.assertRaises(ValueError, bufio.read)
1007 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1008 self.assertRaises(ValueError, bufio.read)
1009 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1010 self.assertRaises(ValueError, bufio.read)
1011
1012 def test_misbehaved_io_read(self):
1013 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1014 bufio = self.tp(rawio)
1015 # _pyio.BufferedReader seems to implement reading different, so that
1016 # checking this is not so easy.
1017 self.assertRaises(IOError, bufio.read, 10)
1018
1019 def test_garbage_collection(self):
1020 # C BufferedReader objects are collected.
1021 # The Python version has __del__, so it ends into gc.garbage instead
1022 rawio = self.FileIO(support.TESTFN, "w+b")
1023 f = self.tp(rawio)
1024 f.f = f
1025 wr = weakref.ref(f)
1026 del f
1027 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001028 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00001029
R David Murray5b2cf5e2013-02-23 22:11:21 -05001030 def test_args_error(self):
1031 # Issue #17275
1032 with self.assertRaisesRegexp(TypeError, "BufferedReader"):
1033 self.tp(io.BytesIO(), 1024, 1024, 1024)
1034
1035
Antoine Pitrou19690592009-06-12 20:14:08 +00001036class PyBufferedReaderTest(BufferedReaderTest):
1037 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001038
1039
Antoine Pitrou19690592009-06-12 20:14:08 +00001040class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1041 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001042
Antoine Pitrou19690592009-06-12 20:14:08 +00001043 def test_constructor(self):
1044 rawio = self.MockRawIO()
1045 bufio = self.tp(rawio)
1046 bufio.__init__(rawio)
1047 bufio.__init__(rawio, buffer_size=1024)
1048 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +00001049 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001050 bufio.flush()
1051 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1052 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1053 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1054 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +00001055 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001056 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001057 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001058
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001059 def test_uninitialized(self):
1060 bufio = self.tp.__new__(self.tp)
1061 del bufio
1062 bufio = self.tp.__new__(self.tp)
1063 self.assertRaisesRegexp((ValueError, AttributeError),
1064 'uninitialized|has no attribute',
1065 bufio.write, b'')
1066 bufio.__init__(self.MockRawIO())
1067 self.assertEqual(bufio.write(b''), 0)
1068
Antoine Pitrou19690592009-06-12 20:14:08 +00001069 def test_detach_flush(self):
1070 raw = self.MockRawIO()
1071 buf = self.tp(raw)
1072 buf.write(b"howdy!")
1073 self.assertFalse(raw._write_stack)
1074 buf.detach()
1075 self.assertEqual(raw._write_stack, [b"howdy!"])
1076
1077 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001078 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +00001079 writer = self.MockRawIO()
1080 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001081 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001082 self.assertFalse(writer._write_stack)
1083
Antoine Pitrou19690592009-06-12 20:14:08 +00001084 def test_write_overflow(self):
1085 writer = self.MockRawIO()
1086 bufio = self.tp(writer, 8)
1087 contents = b"abcdefghijklmnop"
1088 for n in range(0, len(contents), 3):
1089 bufio.write(contents[n:n+3])
1090 flushed = b"".join(writer._write_stack)
1091 # At least (total - 8) bytes were implicitly flushed, perhaps more
1092 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001093 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001094
Antoine Pitrou19690592009-06-12 20:14:08 +00001095 def check_writes(self, intermediate_func):
1096 # Lots of writes, test the flushed output is as expected.
1097 contents = bytes(range(256)) * 1000
1098 n = 0
1099 writer = self.MockRawIO()
1100 bufio = self.tp(writer, 13)
1101 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1102 def gen_sizes():
1103 for size in count(1):
1104 for i in range(15):
1105 yield size
1106 sizes = gen_sizes()
1107 while n < len(contents):
1108 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001109 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001110 intermediate_func(bufio)
1111 n += size
1112 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001113 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001114 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001115
Antoine Pitrou19690592009-06-12 20:14:08 +00001116 def test_writes(self):
1117 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001118
Antoine Pitrou19690592009-06-12 20:14:08 +00001119 def test_writes_and_flushes(self):
1120 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001121
Antoine Pitrou19690592009-06-12 20:14:08 +00001122 def test_writes_and_seeks(self):
1123 def _seekabs(bufio):
1124 pos = bufio.tell()
1125 bufio.seek(pos + 1, 0)
1126 bufio.seek(pos - 1, 0)
1127 bufio.seek(pos, 0)
1128 self.check_writes(_seekabs)
1129 def _seekrel(bufio):
1130 pos = bufio.seek(0, 1)
1131 bufio.seek(+1, 1)
1132 bufio.seek(-1, 1)
1133 bufio.seek(pos, 0)
1134 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001135
Antoine Pitrou19690592009-06-12 20:14:08 +00001136 def test_writes_and_truncates(self):
1137 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001138
Antoine Pitrou19690592009-06-12 20:14:08 +00001139 def test_write_non_blocking(self):
1140 raw = self.MockNonBlockWriterIO()
1141 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001142
Ezio Melotti2623a372010-11-21 13:34:58 +00001143 self.assertEqual(bufio.write(b"abcd"), 4)
1144 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001145 # 1 byte will be written, the rest will be buffered
1146 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001147 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001148
Antoine Pitrou19690592009-06-12 20:14:08 +00001149 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1150 raw.block_on(b"0")
1151 try:
1152 bufio.write(b"opqrwxyz0123456789")
1153 except self.BlockingIOError as e:
1154 written = e.characters_written
1155 else:
1156 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001157 self.assertEqual(written, 16)
1158 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001159 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001160
Ezio Melotti2623a372010-11-21 13:34:58 +00001161 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001162 s = raw.pop_written()
1163 # Previously buffered bytes were flushed
1164 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001165
Antoine Pitrou19690592009-06-12 20:14:08 +00001166 def test_write_and_rewind(self):
1167 raw = io.BytesIO()
1168 bufio = self.tp(raw, 4)
1169 self.assertEqual(bufio.write(b"abcdef"), 6)
1170 self.assertEqual(bufio.tell(), 6)
1171 bufio.seek(0, 0)
1172 self.assertEqual(bufio.write(b"XY"), 2)
1173 bufio.seek(6, 0)
1174 self.assertEqual(raw.getvalue(), b"XYcdef")
1175 self.assertEqual(bufio.write(b"123456"), 6)
1176 bufio.flush()
1177 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001178
Antoine Pitrou19690592009-06-12 20:14:08 +00001179 def test_flush(self):
1180 writer = self.MockRawIO()
1181 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001182 bufio.write(b"abc")
1183 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001184 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001185
Antoine Pitrou78e761e2012-10-16 22:57:11 +02001186 def test_writelines(self):
1187 l = [b'ab', b'cd', b'ef']
1188 writer = self.MockRawIO()
1189 bufio = self.tp(writer, 8)
1190 bufio.writelines(l)
1191 bufio.flush()
1192 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1193
1194 def test_writelines_userlist(self):
1195 l = UserList([b'ab', b'cd', b'ef'])
1196 writer = self.MockRawIO()
1197 bufio = self.tp(writer, 8)
1198 bufio.writelines(l)
1199 bufio.flush()
1200 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1201
1202 def test_writelines_error(self):
1203 writer = self.MockRawIO()
1204 bufio = self.tp(writer, 8)
1205 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1206 self.assertRaises(TypeError, bufio.writelines, None)
1207
Antoine Pitrou19690592009-06-12 20:14:08 +00001208 def test_destructor(self):
1209 writer = self.MockRawIO()
1210 bufio = self.tp(writer, 8)
1211 bufio.write(b"abc")
1212 del bufio
1213 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001214 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001215
1216 def test_truncate(self):
1217 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001218 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001219 bufio = self.tp(raw, 8)
1220 bufio.write(b"abcdef")
1221 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001222 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001223 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001224 self.assertEqual(f.read(), b"abc")
1225
Victor Stinner6a102812010-04-27 23:55:59 +00001226 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001227 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001228 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001229 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001230 # Write out many bytes from many threads and test they were
1231 # all flushed.
1232 N = 1000
1233 contents = bytes(range(256)) * N
1234 sizes = cycle([1, 19])
1235 n = 0
1236 queue = deque()
1237 while n < len(contents):
1238 size = next(sizes)
1239 queue.append(contents[n:n+size])
1240 n += size
1241 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001242 # We use a real file object because it allows us to
1243 # exercise situations where the GIL is released before
1244 # writing the buffer to the raw streams. This is in addition
1245 # to concurrency issues due to switching threads in the middle
1246 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001247 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001248 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001249 errors = []
1250 def f():
1251 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001252 while True:
1253 try:
1254 s = queue.popleft()
1255 except IndexError:
1256 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001257 bufio.write(s)
1258 except Exception as e:
1259 errors.append(e)
1260 raise
1261 threads = [threading.Thread(target=f) for x in range(20)]
1262 for t in threads:
1263 t.start()
1264 time.sleep(0.02) # yield
1265 for t in threads:
1266 t.join()
1267 self.assertFalse(errors,
1268 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001269 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001270 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001271 s = f.read()
1272 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001273 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001274 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001275 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001276
Antoine Pitrou19690592009-06-12 20:14:08 +00001277 def test_misbehaved_io(self):
1278 rawio = self.MisbehavedRawIO()
1279 bufio = self.tp(rawio, 5)
1280 self.assertRaises(IOError, bufio.seek, 0)
1281 self.assertRaises(IOError, bufio.tell)
1282 self.assertRaises(IOError, bufio.write, b"abcdef")
1283
1284 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001285 with support.check_warnings(("max_buffer_size is deprecated",
1286 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001287 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001288
Benjamin Petersona2d6d712012-12-20 12:24:10 -06001289 def test_write_error_on_close(self):
1290 raw = self.MockRawIO()
1291 def bad_write(b):
1292 raise IOError()
1293 raw.write = bad_write
1294 b = self.tp(raw)
1295 b.write(b'spam')
1296 self.assertRaises(IOError, b.close) # exception not swallowed
1297 self.assertTrue(b.closed)
1298
Antoine Pitrou19690592009-06-12 20:14:08 +00001299
Antoine Pitroubff5df02012-07-29 19:02:46 +02001300class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001301 tp = io.BufferedWriter
1302
1303 def test_constructor(self):
1304 BufferedWriterTest.test_constructor(self)
1305 # The allocation can succeed on 32-bit builds, e.g. with more
1306 # than 2GB RAM and a 64-bit kernel.
1307 if sys.maxsize > 0x7FFFFFFF:
1308 rawio = self.MockRawIO()
1309 bufio = self.tp(rawio)
1310 self.assertRaises((OverflowError, MemoryError, ValueError),
1311 bufio.__init__, rawio, sys.maxsize)
1312
1313 def test_initialization(self):
1314 rawio = self.MockRawIO()
1315 bufio = self.tp(rawio)
1316 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1317 self.assertRaises(ValueError, bufio.write, b"def")
1318 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1319 self.assertRaises(ValueError, bufio.write, b"def")
1320 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1321 self.assertRaises(ValueError, bufio.write, b"def")
1322
1323 def test_garbage_collection(self):
1324 # C BufferedWriter objects are collected, and collecting them flushes
1325 # all data to disk.
1326 # The Python version has __del__, so it ends into gc.garbage instead
1327 rawio = self.FileIO(support.TESTFN, "w+b")
1328 f = self.tp(rawio)
1329 f.write(b"123xxx")
1330 f.x = f
1331 wr = weakref.ref(f)
1332 del f
1333 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001334 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001335 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001336 self.assertEqual(f.read(), b"123xxx")
1337
R David Murray5b2cf5e2013-02-23 22:11:21 -05001338 def test_args_error(self):
1339 # Issue #17275
1340 with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
1341 self.tp(io.BytesIO(), 1024, 1024, 1024)
1342
Antoine Pitrou19690592009-06-12 20:14:08 +00001343
1344class PyBufferedWriterTest(BufferedWriterTest):
1345 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001346
1347class BufferedRWPairTest(unittest.TestCase):
1348
Antoine Pitrou19690592009-06-12 20:14:08 +00001349 def test_constructor(self):
1350 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001351 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001352
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001353 def test_uninitialized(self):
1354 pair = self.tp.__new__(self.tp)
1355 del pair
1356 pair = self.tp.__new__(self.tp)
1357 self.assertRaisesRegexp((ValueError, AttributeError),
1358 'uninitialized|has no attribute',
1359 pair.read, 0)
1360 self.assertRaisesRegexp((ValueError, AttributeError),
1361 'uninitialized|has no attribute',
1362 pair.write, b'')
1363 pair.__init__(self.MockRawIO(), self.MockRawIO())
1364 self.assertEqual(pair.read(0), b'')
1365 self.assertEqual(pair.write(b''), 0)
1366
Antoine Pitrou19690592009-06-12 20:14:08 +00001367 def test_detach(self):
1368 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1369 self.assertRaises(self.UnsupportedOperation, pair.detach)
1370
1371 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001372 with support.check_warnings(("max_buffer_size is deprecated",
1373 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001374 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001375
1376 def test_constructor_with_not_readable(self):
1377 class NotReadable(MockRawIO):
1378 def readable(self):
1379 return False
1380
1381 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1382
1383 def test_constructor_with_not_writeable(self):
1384 class NotWriteable(MockRawIO):
1385 def writable(self):
1386 return False
1387
1388 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1389
1390 def test_read(self):
1391 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1392
1393 self.assertEqual(pair.read(3), b"abc")
1394 self.assertEqual(pair.read(1), b"d")
1395 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001396 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1397 self.assertEqual(pair.read(None), b"abc")
1398
1399 def test_readlines(self):
1400 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1401 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1402 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1403 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001404
1405 def test_read1(self):
1406 # .read1() is delegated to the underlying reader object, so this test
1407 # can be shallow.
1408 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1409
1410 self.assertEqual(pair.read1(3), b"abc")
1411
1412 def test_readinto(self):
1413 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1414
1415 data = bytearray(5)
1416 self.assertEqual(pair.readinto(data), 5)
1417 self.assertEqual(data, b"abcde")
1418
1419 def test_write(self):
1420 w = self.MockRawIO()
1421 pair = self.tp(self.MockRawIO(), w)
1422
1423 pair.write(b"abc")
1424 pair.flush()
1425 pair.write(b"def")
1426 pair.flush()
1427 self.assertEqual(w._write_stack, [b"abc", b"def"])
1428
1429 def test_peek(self):
1430 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1431
1432 self.assertTrue(pair.peek(3).startswith(b"abc"))
1433 self.assertEqual(pair.read(3), b"abc")
1434
1435 def test_readable(self):
1436 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1437 self.assertTrue(pair.readable())
1438
1439 def test_writeable(self):
1440 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1441 self.assertTrue(pair.writable())
1442
1443 def test_seekable(self):
1444 # BufferedRWPairs are never seekable, even if their readers and writers
1445 # are.
1446 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1447 self.assertFalse(pair.seekable())
1448
1449 # .flush() is delegated to the underlying writer object and has been
1450 # tested in the test_write method.
1451
1452 def test_close_and_closed(self):
1453 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1454 self.assertFalse(pair.closed)
1455 pair.close()
1456 self.assertTrue(pair.closed)
1457
1458 def test_isatty(self):
1459 class SelectableIsAtty(MockRawIO):
1460 def __init__(self, isatty):
1461 MockRawIO.__init__(self)
1462 self._isatty = isatty
1463
1464 def isatty(self):
1465 return self._isatty
1466
1467 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1468 self.assertFalse(pair.isatty())
1469
1470 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1471 self.assertTrue(pair.isatty())
1472
1473 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1474 self.assertTrue(pair.isatty())
1475
1476 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1477 self.assertTrue(pair.isatty())
1478
Benjamin Peterson1c873bf2014-09-29 22:46:57 -04001479 def test_weakref_clearing(self):
1480 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1481 ref = weakref.ref(brw)
1482 brw = None
1483 ref = None # Shouldn't segfault.
1484
Antoine Pitrou19690592009-06-12 20:14:08 +00001485class CBufferedRWPairTest(BufferedRWPairTest):
1486 tp = io.BufferedRWPair
1487
1488class PyBufferedRWPairTest(BufferedRWPairTest):
1489 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001490
1491
Antoine Pitrou19690592009-06-12 20:14:08 +00001492class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1493 read_mode = "rb+"
1494 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001495
Antoine Pitrou19690592009-06-12 20:14:08 +00001496 def test_constructor(self):
1497 BufferedReaderTest.test_constructor(self)
1498 BufferedWriterTest.test_constructor(self)
1499
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001500 def test_uninitialized(self):
1501 BufferedReaderTest.test_uninitialized(self)
1502 BufferedWriterTest.test_uninitialized(self)
1503
Antoine Pitrou19690592009-06-12 20:14:08 +00001504 def test_read_and_write(self):
1505 raw = self.MockRawIO((b"asdf", b"ghjk"))
1506 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001507
1508 self.assertEqual(b"as", rw.read(2))
1509 rw.write(b"ddd")
1510 rw.write(b"eee")
1511 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001512 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001513 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001514
Antoine Pitrou19690592009-06-12 20:14:08 +00001515 def test_seek_and_tell(self):
1516 raw = self.BytesIO(b"asdfghjkl")
1517 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001518
Ezio Melotti2623a372010-11-21 13:34:58 +00001519 self.assertEqual(b"as", rw.read(2))
1520 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001521 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001522 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001523
Antoine Pitrou808cec52011-08-20 15:40:58 +02001524 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001525 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001526 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001527 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001528 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001529 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001530 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001531 self.assertEqual(7, rw.tell())
1532 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001533 rw.flush()
1534 self.assertEqual(b"asdf123fl", raw.getvalue())
1535
Christian Heimes1a6387e2008-03-26 12:49:49 +00001536 self.assertRaises(TypeError, rw.seek, 0.0)
1537
Antoine Pitrou19690592009-06-12 20:14:08 +00001538 def check_flush_and_read(self, read_func):
1539 raw = self.BytesIO(b"abcdefghi")
1540 bufio = self.tp(raw)
1541
Ezio Melotti2623a372010-11-21 13:34:58 +00001542 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001543 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001544 self.assertEqual(b"ef", read_func(bufio, 2))
1545 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001546 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001547 self.assertEqual(6, bufio.tell())
1548 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001549 raw.seek(0, 0)
1550 raw.write(b"XYZ")
1551 # flush() resets the read buffer
1552 bufio.flush()
1553 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001554 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001555
1556 def test_flush_and_read(self):
1557 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1558
1559 def test_flush_and_readinto(self):
1560 def _readinto(bufio, n=-1):
1561 b = bytearray(n if n >= 0 else 9999)
1562 n = bufio.readinto(b)
1563 return bytes(b[:n])
1564 self.check_flush_and_read(_readinto)
1565
1566 def test_flush_and_peek(self):
1567 def _peek(bufio, n=-1):
1568 # This relies on the fact that the buffer can contain the whole
1569 # raw stream, otherwise peek() can return less.
1570 b = bufio.peek(n)
1571 if n != -1:
1572 b = b[:n]
1573 bufio.seek(len(b), 1)
1574 return b
1575 self.check_flush_and_read(_peek)
1576
1577 def test_flush_and_write(self):
1578 raw = self.BytesIO(b"abcdefghi")
1579 bufio = self.tp(raw)
1580
1581 bufio.write(b"123")
1582 bufio.flush()
1583 bufio.write(b"45")
1584 bufio.flush()
1585 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001586 self.assertEqual(b"12345fghi", raw.getvalue())
1587 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001588
1589 def test_threads(self):
1590 BufferedReaderTest.test_threads(self)
1591 BufferedWriterTest.test_threads(self)
1592
1593 def test_writes_and_peek(self):
1594 def _peek(bufio):
1595 bufio.peek(1)
1596 self.check_writes(_peek)
1597 def _peek(bufio):
1598 pos = bufio.tell()
1599 bufio.seek(-1, 1)
1600 bufio.peek(1)
1601 bufio.seek(pos, 0)
1602 self.check_writes(_peek)
1603
1604 def test_writes_and_reads(self):
1605 def _read(bufio):
1606 bufio.seek(-1, 1)
1607 bufio.read(1)
1608 self.check_writes(_read)
1609
1610 def test_writes_and_read1s(self):
1611 def _read1(bufio):
1612 bufio.seek(-1, 1)
1613 bufio.read1(1)
1614 self.check_writes(_read1)
1615
1616 def test_writes_and_readintos(self):
1617 def _read(bufio):
1618 bufio.seek(-1, 1)
1619 bufio.readinto(bytearray(1))
1620 self.check_writes(_read)
1621
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001622 def test_write_after_readahead(self):
1623 # Issue #6629: writing after the buffer was filled by readahead should
1624 # first rewind the raw stream.
1625 for overwrite_size in [1, 5]:
1626 raw = self.BytesIO(b"A" * 10)
1627 bufio = self.tp(raw, 4)
1628 # Trigger readahead
1629 self.assertEqual(bufio.read(1), b"A")
1630 self.assertEqual(bufio.tell(), 1)
1631 # Overwriting should rewind the raw stream if it needs so
1632 bufio.write(b"B" * overwrite_size)
1633 self.assertEqual(bufio.tell(), overwrite_size + 1)
1634 # If the write size was smaller than the buffer size, flush() and
1635 # check that rewind happens.
1636 bufio.flush()
1637 self.assertEqual(bufio.tell(), overwrite_size + 1)
1638 s = raw.getvalue()
1639 self.assertEqual(s,
1640 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1641
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001642 def test_write_rewind_write(self):
1643 # Various combinations of reading / writing / seeking backwards / writing again
1644 def mutate(bufio, pos1, pos2):
1645 assert pos2 >= pos1
1646 # Fill the buffer
1647 bufio.seek(pos1)
1648 bufio.read(pos2 - pos1)
1649 bufio.write(b'\x02')
1650 # This writes earlier than the previous write, but still inside
1651 # the buffer.
1652 bufio.seek(pos1)
1653 bufio.write(b'\x01')
1654
1655 b = b"\x80\x81\x82\x83\x84"
1656 for i in range(0, len(b)):
1657 for j in range(i, len(b)):
1658 raw = self.BytesIO(b)
1659 bufio = self.tp(raw, 100)
1660 mutate(bufio, i, j)
1661 bufio.flush()
1662 expected = bytearray(b)
1663 expected[j] = 2
1664 expected[i] = 1
1665 self.assertEqual(raw.getvalue(), expected,
1666 "failed result for i=%d, j=%d" % (i, j))
1667
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001668 def test_truncate_after_read_or_write(self):
1669 raw = self.BytesIO(b"A" * 10)
1670 bufio = self.tp(raw, 100)
1671 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1672 self.assertEqual(bufio.truncate(), 2)
1673 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1674 self.assertEqual(bufio.truncate(), 4)
1675
Antoine Pitrou19690592009-06-12 20:14:08 +00001676 def test_misbehaved_io(self):
1677 BufferedReaderTest.test_misbehaved_io(self)
1678 BufferedWriterTest.test_misbehaved_io(self)
1679
Antoine Pitrou808cec52011-08-20 15:40:58 +02001680 def test_interleaved_read_write(self):
1681 # Test for issue #12213
1682 with self.BytesIO(b'abcdefgh') as raw:
1683 with self.tp(raw, 100) as f:
1684 f.write(b"1")
1685 self.assertEqual(f.read(1), b'b')
1686 f.write(b'2')
1687 self.assertEqual(f.read1(1), b'd')
1688 f.write(b'3')
1689 buf = bytearray(1)
1690 f.readinto(buf)
1691 self.assertEqual(buf, b'f')
1692 f.write(b'4')
1693 self.assertEqual(f.peek(1), b'h')
1694 f.flush()
1695 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1696
1697 with self.BytesIO(b'abc') as raw:
1698 with self.tp(raw, 100) as f:
1699 self.assertEqual(f.read(1), b'a')
1700 f.write(b"2")
1701 self.assertEqual(f.read(1), b'c')
1702 f.flush()
1703 self.assertEqual(raw.getvalue(), b'a2c')
1704
1705 def test_interleaved_readline_write(self):
1706 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1707 with self.tp(raw) as f:
1708 f.write(b'1')
1709 self.assertEqual(f.readline(), b'b\n')
1710 f.write(b'2')
1711 self.assertEqual(f.readline(), b'def\n')
1712 f.write(b'3')
1713 self.assertEqual(f.readline(), b'\n')
1714 f.flush()
1715 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1716
R David Murray5b2cf5e2013-02-23 22:11:21 -05001717
Antoine Pitroubff5df02012-07-29 19:02:46 +02001718class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1719 BufferedRandomTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001720 tp = io.BufferedRandom
1721
1722 def test_constructor(self):
1723 BufferedRandomTest.test_constructor(self)
1724 # The allocation can succeed on 32-bit builds, e.g. with more
1725 # than 2GB RAM and a 64-bit kernel.
1726 if sys.maxsize > 0x7FFFFFFF:
1727 rawio = self.MockRawIO()
1728 bufio = self.tp(rawio)
1729 self.assertRaises((OverflowError, MemoryError, ValueError),
1730 bufio.__init__, rawio, sys.maxsize)
1731
1732 def test_garbage_collection(self):
1733 CBufferedReaderTest.test_garbage_collection(self)
1734 CBufferedWriterTest.test_garbage_collection(self)
1735
R David Murray5b2cf5e2013-02-23 22:11:21 -05001736 def test_args_error(self):
1737 # Issue #17275
1738 with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
1739 self.tp(io.BytesIO(), 1024, 1024, 1024)
1740
1741
Antoine Pitrou19690592009-06-12 20:14:08 +00001742class PyBufferedRandomTest(BufferedRandomTest):
1743 tp = pyio.BufferedRandom
1744
1745
Christian Heimes1a6387e2008-03-26 12:49:49 +00001746# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1747# properties:
1748# - A single output character can correspond to many bytes of input.
1749# - The number of input bytes to complete the character can be
1750# undetermined until the last input byte is received.
1751# - The number of input bytes can vary depending on previous input.
1752# - A single input byte can correspond to many characters of output.
1753# - The number of output characters can be undetermined until the
1754# last input byte is received.
1755# - The number of output characters can vary depending on previous input.
1756
1757class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1758 """
1759 For testing seek/tell behavior with a stateful, buffering decoder.
1760
1761 Input is a sequence of words. Words may be fixed-length (length set
1762 by input) or variable-length (period-terminated). In variable-length
1763 mode, extra periods are ignored. Possible words are:
1764 - 'i' followed by a number sets the input length, I (maximum 99).
1765 When I is set to 0, words are space-terminated.
1766 - 'o' followed by a number sets the output length, O (maximum 99).
1767 - Any other word is converted into a word followed by a period on
1768 the output. The output word consists of the input word truncated
1769 or padded out with hyphens to make its length equal to O. If O
1770 is 0, the word is output verbatim without truncating or padding.
1771 I and O are initially set to 1. When I changes, any buffered input is
1772 re-scanned according to the new I. EOF also terminates the last word.
1773 """
1774
1775 def __init__(self, errors='strict'):
1776 codecs.IncrementalDecoder.__init__(self, errors)
1777 self.reset()
1778
1779 def __repr__(self):
1780 return '<SID %x>' % id(self)
1781
1782 def reset(self):
1783 self.i = 1
1784 self.o = 1
1785 self.buffer = bytearray()
1786
1787 def getstate(self):
1788 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1789 return bytes(self.buffer), i*100 + o
1790
1791 def setstate(self, state):
1792 buffer, io = state
1793 self.buffer = bytearray(buffer)
1794 i, o = divmod(io, 100)
1795 self.i, self.o = i ^ 1, o ^ 1
1796
1797 def decode(self, input, final=False):
1798 output = ''
1799 for b in input:
1800 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001801 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001802 if self.buffer:
1803 output += self.process_word()
1804 else:
1805 self.buffer.append(b)
1806 else: # fixed-length, terminate after self.i bytes
1807 self.buffer.append(b)
1808 if len(self.buffer) == self.i:
1809 output += self.process_word()
1810 if final and self.buffer: # EOF terminates the last word
1811 output += self.process_word()
1812 return output
1813
1814 def process_word(self):
1815 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001816 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001817 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001818 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001819 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1820 else:
1821 output = self.buffer.decode('ascii')
1822 if len(output) < self.o:
1823 output += '-'*self.o # pad out with hyphens
1824 if self.o:
1825 output = output[:self.o] # truncate to output length
1826 output += '.'
1827 self.buffer = bytearray()
1828 return output
1829
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001830 codecEnabled = False
1831
1832 @classmethod
1833 def lookupTestDecoder(cls, name):
1834 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001835 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001836 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001837 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001838 incrementalencoder=None,
1839 streamreader=None, streamwriter=None,
1840 incrementaldecoder=cls)
1841
1842# Register the previous decoder for testing.
1843# Disabled by default, tests will enable it.
1844codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1845
1846
Christian Heimes1a6387e2008-03-26 12:49:49 +00001847class StatefulIncrementalDecoderTest(unittest.TestCase):
1848 """
1849 Make sure the StatefulIncrementalDecoder actually works.
1850 """
1851
1852 test_cases = [
1853 # I=1, O=1 (fixed-length input == fixed-length output)
1854 (b'abcd', False, 'a.b.c.d.'),
1855 # I=0, O=0 (variable-length input, variable-length output)
1856 (b'oiabcd', True, 'abcd.'),
1857 # I=0, O=0 (should ignore extra periods)
1858 (b'oi...abcd...', True, 'abcd.'),
1859 # I=0, O=6 (variable-length input, fixed-length output)
1860 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1861 # I=2, O=6 (fixed-length input < fixed-length output)
1862 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1863 # I=6, O=3 (fixed-length input > fixed-length output)
1864 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1865 # I=0, then 3; O=29, then 15 (with longer output)
1866 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1867 'a----------------------------.' +
1868 'b----------------------------.' +
1869 'cde--------------------------.' +
1870 'abcdefghijabcde.' +
1871 'a.b------------.' +
1872 '.c.------------.' +
1873 'd.e------------.' +
1874 'k--------------.' +
1875 'l--------------.' +
1876 'm--------------.')
1877 ]
1878
Antoine Pitrou19690592009-06-12 20:14:08 +00001879 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001880 # Try a few one-shot test cases.
1881 for input, eof, output in self.test_cases:
1882 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001883 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001884
1885 # Also test an unfinished decode, followed by forcing EOF.
1886 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001887 self.assertEqual(d.decode(b'oiabcd'), '')
1888 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001889
1890class TextIOWrapperTest(unittest.TestCase):
1891
1892 def setUp(self):
1893 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1894 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001895 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001896
1897 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001898 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001899
Antoine Pitrou19690592009-06-12 20:14:08 +00001900 def test_constructor(self):
1901 r = self.BytesIO(b"\xc3\xa9\n\n")
1902 b = self.BufferedReader(r, 1000)
1903 t = self.TextIOWrapper(b)
1904 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001905 self.assertEqual(t.encoding, "latin1")
1906 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001907 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001908 self.assertEqual(t.encoding, "utf8")
1909 self.assertEqual(t.line_buffering, True)
1910 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001911 self.assertRaises(TypeError, t.__init__, b, newline=42)
1912 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1913
1914 def test_detach(self):
1915 r = self.BytesIO()
1916 b = self.BufferedWriter(r)
1917 t = self.TextIOWrapper(b)
1918 self.assertIs(t.detach(), b)
1919
1920 t = self.TextIOWrapper(b, encoding="ascii")
1921 t.write("howdy")
1922 self.assertFalse(r.getvalue())
1923 t.detach()
1924 self.assertEqual(r.getvalue(), b"howdy")
1925 self.assertRaises(ValueError, t.detach)
1926
Benjamin Peterson53ae6142014-12-21 20:51:50 -06001927 # Operations independent of the detached stream should still work
1928 repr(t)
1929 self.assertEqual(t.encoding, "ascii")
1930 self.assertEqual(t.errors, "strict")
1931 self.assertFalse(t.line_buffering)
1932
Antoine Pitrou19690592009-06-12 20:14:08 +00001933 def test_repr(self):
1934 raw = self.BytesIO("hello".encode("utf-8"))
1935 b = self.BufferedReader(raw)
1936 t = self.TextIOWrapper(b, encoding="utf-8")
1937 modname = self.TextIOWrapper.__module__
1938 self.assertEqual(repr(t),
1939 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1940 raw.name = "dummy"
1941 self.assertEqual(repr(t),
1942 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1943 raw.name = b"dummy"
1944 self.assertEqual(repr(t),
1945 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1946
Benjamin Peterson53ae6142014-12-21 20:51:50 -06001947 t.buffer.detach()
1948 repr(t) # Should not raise an exception
1949
Antoine Pitrou19690592009-06-12 20:14:08 +00001950 def test_line_buffering(self):
1951 r = self.BytesIO()
1952 b = self.BufferedWriter(r, 1000)
1953 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1954 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00001955 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001956 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00001957 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001958 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00001959 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001960
Antoine Pitrou19690592009-06-12 20:14:08 +00001961 def test_encoding(self):
1962 # Check the encoding attribute is always set, and valid
1963 b = self.BytesIO()
1964 t = self.TextIOWrapper(b, encoding="utf8")
1965 self.assertEqual(t.encoding, "utf8")
1966 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001967 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001968 codecs.lookup(t.encoding)
1969
1970 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001971 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001972 b = self.BytesIO(b"abc\n\xff\n")
1973 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001974 self.assertRaises(UnicodeError, t.read)
1975 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001976 b = self.BytesIO(b"abc\n\xff\n")
1977 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001978 self.assertRaises(UnicodeError, t.read)
1979 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001980 b = self.BytesIO(b"abc\n\xff\n")
1981 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00001982 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001983 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001984 b = self.BytesIO(b"abc\n\xff\n")
1985 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00001986 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001987
Antoine Pitrou19690592009-06-12 20:14:08 +00001988 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001989 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001990 b = self.BytesIO()
1991 t = self.TextIOWrapper(b, encoding="ascii")
1992 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001993 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001994 b = self.BytesIO()
1995 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1996 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001997 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001998 b = self.BytesIO()
1999 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002000 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002001 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002002 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002003 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002004 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00002005 b = self.BytesIO()
2006 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00002007 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00002008 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002009 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002010 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002011
Antoine Pitrou19690592009-06-12 20:14:08 +00002012 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002013 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2014
2015 tests = [
2016 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2017 [ '', input_lines ],
2018 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2019 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2020 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2021 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002022 encodings = (
2023 'utf-8', 'latin-1',
2024 'utf-16', 'utf-16-le', 'utf-16-be',
2025 'utf-32', 'utf-32-le', 'utf-32-be',
2026 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00002027
2028 # Try a range of buffer sizes to test the case where \r is the last
2029 # character in TextIOWrapper._pending_line.
2030 for encoding in encodings:
2031 # XXX: str.encode() should return bytes
2032 data = bytes(''.join(input_lines).encode(encoding))
2033 for do_reads in (False, True):
2034 for bufsize in range(1, 10):
2035 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002036 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2037 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00002038 encoding=encoding)
2039 if do_reads:
2040 got_lines = []
2041 while True:
2042 c2 = textio.read(2)
2043 if c2 == '':
2044 break
Ezio Melotti2623a372010-11-21 13:34:58 +00002045 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002046 got_lines.append(c2 + textio.readline())
2047 else:
2048 got_lines = list(textio)
2049
2050 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00002051 self.assertEqual(got_line, exp_line)
2052 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002053
Antoine Pitrou19690592009-06-12 20:14:08 +00002054 def test_newlines_input(self):
2055 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002056 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2057 for newline, expected in [
2058 (None, normalized.decode("ascii").splitlines(True)),
2059 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00002060 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2061 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2062 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00002063 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00002064 buf = self.BytesIO(testdata)
2065 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00002066 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002067 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002068 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002069
Antoine Pitrou19690592009-06-12 20:14:08 +00002070 def test_newlines_output(self):
2071 testdict = {
2072 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2073 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2074 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2075 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2076 }
2077 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2078 for newline, expected in tests:
2079 buf = self.BytesIO()
2080 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2081 txt.write("AAA\nB")
2082 txt.write("BB\nCCC\n")
2083 txt.write("X\rY\r\nZ")
2084 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002085 self.assertEqual(buf.closed, False)
2086 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00002087
2088 def test_destructor(self):
2089 l = []
2090 base = self.BytesIO
2091 class MyBytesIO(base):
2092 def close(self):
2093 l.append(self.getvalue())
2094 base.close(self)
2095 b = MyBytesIO()
2096 t = self.TextIOWrapper(b, encoding="ascii")
2097 t.write("abc")
2098 del t
2099 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00002100 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00002101
2102 def test_override_destructor(self):
2103 record = []
2104 class MyTextIO(self.TextIOWrapper):
2105 def __del__(self):
2106 record.append(1)
2107 try:
2108 f = super(MyTextIO, self).__del__
2109 except AttributeError:
2110 pass
2111 else:
2112 f()
2113 def close(self):
2114 record.append(2)
2115 super(MyTextIO, self).close()
2116 def flush(self):
2117 record.append(3)
2118 super(MyTextIO, self).flush()
2119 b = self.BytesIO()
2120 t = MyTextIO(b, encoding="ascii")
2121 del t
2122 support.gc_collect()
2123 self.assertEqual(record, [1, 2, 3])
2124
2125 def test_error_through_destructor(self):
2126 # Test that the exception state is not modified by a destructor,
2127 # even if close() fails.
2128 rawio = self.CloseFailureIO()
2129 def f():
2130 self.TextIOWrapper(rawio).xyzzy
2131 with support.captured_output("stderr") as s:
2132 self.assertRaises(AttributeError, f)
2133 s = s.getvalue().strip()
2134 if s:
2135 # The destructor *may* have printed an unraisable error, check it
2136 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002137 self.assertTrue(s.startswith("Exception IOError: "), s)
2138 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002139
2140 # Systematic tests of the text I/O API
2141
Antoine Pitrou19690592009-06-12 20:14:08 +00002142 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002143 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2144 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00002145 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002146 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002147 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002148 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002149 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002150 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002151 self.assertEqual(f.tell(), 0)
2152 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002153 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002154 self.assertEqual(f.seek(0), 0)
2155 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002156 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002157 self.assertEqual(f.read(2), "ab")
2158 self.assertEqual(f.read(1), "c")
2159 self.assertEqual(f.read(1), "")
2160 self.assertEqual(f.read(), "")
2161 self.assertEqual(f.tell(), cookie)
2162 self.assertEqual(f.seek(0), 0)
2163 self.assertEqual(f.seek(0, 2), cookie)
2164 self.assertEqual(f.write("def"), 3)
2165 self.assertEqual(f.seek(cookie), cookie)
2166 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002167 if enc.startswith("utf"):
2168 self.multi_line_test(f, enc)
2169 f.close()
2170
2171 def multi_line_test(self, f, enc):
2172 f.seek(0)
2173 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002174 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002175 wlines = []
2176 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2177 chars = []
2178 for i in range(size):
2179 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002180 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002181 wlines.append((f.tell(), line))
2182 f.write(line)
2183 f.seek(0)
2184 rlines = []
2185 while True:
2186 pos = f.tell()
2187 line = f.readline()
2188 if not line:
2189 break
2190 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002191 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002192
Antoine Pitrou19690592009-06-12 20:14:08 +00002193 def test_telling(self):
2194 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002195 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002196 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002197 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002198 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002199 p2 = f.tell()
2200 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002201 self.assertEqual(f.tell(), p0)
2202 self.assertEqual(f.readline(), "\xff\n")
2203 self.assertEqual(f.tell(), p1)
2204 self.assertEqual(f.readline(), "\xff\n")
2205 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002206 f.seek(0)
2207 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002208 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002209 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002210 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002211 f.close()
2212
Antoine Pitrou19690592009-06-12 20:14:08 +00002213 def test_seeking(self):
2214 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002215 prefix_size = chunk_size - 2
2216 u_prefix = "a" * prefix_size
2217 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002218 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002219 u_suffix = "\u8888\n"
2220 suffix = bytes(u_suffix.encode("utf-8"))
2221 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002222 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002223 f.write(line*2)
2224 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002225 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002226 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002227 self.assertEqual(s, prefix.decode("ascii"))
2228 self.assertEqual(f.tell(), prefix_size)
2229 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002230
Antoine Pitrou19690592009-06-12 20:14:08 +00002231 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002232 # Regression test for a specific bug
2233 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002234 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002235 f.write(data)
2236 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002237 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002238 f._CHUNK_SIZE # Just test that it exists
2239 f._CHUNK_SIZE = 2
2240 f.readline()
2241 f.tell()
2242
Antoine Pitrou19690592009-06-12 20:14:08 +00002243 def test_seek_and_tell(self):
2244 #Test seek/tell using the StatefulIncrementalDecoder.
2245 # Make test faster by doing smaller seeks
2246 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002247
Antoine Pitrou19690592009-06-12 20:14:08 +00002248 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002249 """Tell/seek to various points within a data stream and ensure
2250 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002251 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002252 f.write(data)
2253 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002254 f = self.open(support.TESTFN, encoding='test_decoder')
2255 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002256 decoded = f.read()
2257 f.close()
2258
2259 for i in range(min_pos, len(decoded) + 1): # seek positions
2260 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002261 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002262 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002263 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002264 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002265 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002266 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002267 f.close()
2268
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002269 # Enable the test decoder.
2270 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002271
2272 # Run the tests.
2273 try:
2274 # Try each test case.
2275 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002276 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002277
2278 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002279 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2280 offset = CHUNK_SIZE - len(input)//2
2281 prefix = b'.'*offset
2282 # Don't bother seeking into the prefix (takes too long).
2283 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002284 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002285
2286 # Ensure our test decoder won't interfere with subsequent tests.
2287 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002288 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002289
Antoine Pitrou19690592009-06-12 20:14:08 +00002290 def test_encoded_writes(self):
2291 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002292 tests = ("utf-16",
2293 "utf-16-le",
2294 "utf-16-be",
2295 "utf-32",
2296 "utf-32-le",
2297 "utf-32-be")
2298 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002299 buf = self.BytesIO()
2300 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002301 # Check if the BOM is written only once (see issue1753).
2302 f.write(data)
2303 f.write(data)
2304 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002305 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002306 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002307 self.assertEqual(f.read(), data * 2)
2308 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002309
Antoine Pitrou19690592009-06-12 20:14:08 +00002310 def test_unreadable(self):
2311 class UnReadable(self.BytesIO):
2312 def readable(self):
2313 return False
2314 txt = self.TextIOWrapper(UnReadable())
2315 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002316
Antoine Pitrou19690592009-06-12 20:14:08 +00002317 def test_read_one_by_one(self):
2318 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002319 reads = ""
2320 while True:
2321 c = txt.read(1)
2322 if not c:
2323 break
2324 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002325 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002326
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002327 def test_readlines(self):
2328 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2329 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2330 txt.seek(0)
2331 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2332 txt.seek(0)
2333 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2334
Christian Heimes1a6387e2008-03-26 12:49:49 +00002335 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002336 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002337 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002338 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002339 reads = ""
2340 while True:
2341 c = txt.read(128)
2342 if not c:
2343 break
2344 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002345 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002346
Antoine Pitroueadca1d2012-10-16 23:02:27 +02002347 def test_writelines(self):
2348 l = ['ab', 'cd', 'ef']
2349 buf = self.BytesIO()
2350 txt = self.TextIOWrapper(buf)
2351 txt.writelines(l)
2352 txt.flush()
2353 self.assertEqual(buf.getvalue(), b'abcdef')
2354
2355 def test_writelines_userlist(self):
2356 l = UserList(['ab', 'cd', 'ef'])
2357 buf = self.BytesIO()
2358 txt = self.TextIOWrapper(buf)
2359 txt.writelines(l)
2360 txt.flush()
2361 self.assertEqual(buf.getvalue(), b'abcdef')
2362
2363 def test_writelines_error(self):
2364 txt = self.TextIOWrapper(self.BytesIO())
2365 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2366 self.assertRaises(TypeError, txt.writelines, None)
2367 self.assertRaises(TypeError, txt.writelines, b'abc')
2368
Christian Heimes1a6387e2008-03-26 12:49:49 +00002369 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002370 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002371
2372 # read one char at a time
2373 reads = ""
2374 while True:
2375 c = txt.read(1)
2376 if not c:
2377 break
2378 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002379 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002380
2381 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002382 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002383 txt._CHUNK_SIZE = 4
2384
2385 reads = ""
2386 while True:
2387 c = txt.read(4)
2388 if not c:
2389 break
2390 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002391 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002392
2393 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002394 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002395 txt._CHUNK_SIZE = 4
2396
2397 reads = txt.read(4)
2398 reads += txt.read(4)
2399 reads += txt.readline()
2400 reads += txt.readline()
2401 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002402 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002403
2404 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002405 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002406 txt._CHUNK_SIZE = 4
2407
2408 reads = txt.read(4)
2409 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002410 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002411
2412 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002413 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002414 txt._CHUNK_SIZE = 4
2415
2416 reads = txt.read(4)
2417 pos = txt.tell()
2418 txt.seek(0)
2419 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002420 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002421
2422 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002423 buffer = self.BytesIO(self.testdata)
2424 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002425
2426 self.assertEqual(buffer.seekable(), txt.seekable())
2427
Antoine Pitrou19690592009-06-12 20:14:08 +00002428 def test_append_bom(self):
2429 # The BOM is not written again when appending to a non-empty file
2430 filename = support.TESTFN
2431 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2432 with self.open(filename, 'w', encoding=charset) as f:
2433 f.write('aaa')
2434 pos = f.tell()
2435 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002436 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002437
2438 with self.open(filename, 'a', encoding=charset) as f:
2439 f.write('xxx')
2440 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002441 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002442
Antoine Pitrou19690592009-06-12 20:14:08 +00002443 def test_seek_bom(self):
2444 # Same test, but when seeking manually
2445 filename = support.TESTFN
2446 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2447 with self.open(filename, 'w', encoding=charset) as f:
2448 f.write('aaa')
2449 pos = f.tell()
2450 with self.open(filename, 'r+', encoding=charset) as f:
2451 f.seek(pos)
2452 f.write('zzz')
2453 f.seek(0)
2454 f.write('bbb')
2455 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002456 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002457
2458 def test_errors_property(self):
2459 with self.open(support.TESTFN, "w") as f:
2460 self.assertEqual(f.errors, "strict")
2461 with self.open(support.TESTFN, "w", errors="replace") as f:
2462 self.assertEqual(f.errors, "replace")
2463
Victor Stinner6a102812010-04-27 23:55:59 +00002464 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002465 def test_threads_write(self):
2466 # Issue6750: concurrent writes could duplicate data
2467 event = threading.Event()
2468 with self.open(support.TESTFN, "w", buffering=1) as f:
2469 def run(n):
2470 text = "Thread%03d\n" % n
2471 event.wait()
2472 f.write(text)
2473 threads = [threading.Thread(target=lambda n=x: run(n))
2474 for x in range(20)]
2475 for t in threads:
2476 t.start()
2477 time.sleep(0.02)
2478 event.set()
2479 for t in threads:
2480 t.join()
2481 with self.open(support.TESTFN) as f:
2482 content = f.read()
2483 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002484 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002485
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002486 def test_flush_error_on_close(self):
2487 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2488 def bad_flush():
2489 raise IOError()
2490 txt.flush = bad_flush
2491 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -06002492 self.assertTrue(txt.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002493
2494 def test_multi_close(self):
2495 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2496 txt.close()
2497 txt.close()
2498 txt.close()
2499 self.assertRaises(ValueError, txt.flush)
2500
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002501 def test_readonly_attributes(self):
2502 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2503 buf = self.BytesIO(self.testdata)
2504 with self.assertRaises((AttributeError, TypeError)):
2505 txt.buffer = buf
2506
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002507 def test_read_nonbytes(self):
2508 # Issue #17106
2509 # Crash when underlying read() returns non-bytes
2510 class NonbytesStream(self.StringIO):
2511 read1 = self.StringIO.read
2512 class NonbytesStream(self.StringIO):
2513 read1 = self.StringIO.read
2514 t = self.TextIOWrapper(NonbytesStream('a'))
2515 with self.maybeRaises(TypeError):
2516 t.read(1)
2517 t = self.TextIOWrapper(NonbytesStream('a'))
2518 with self.maybeRaises(TypeError):
2519 t.readline()
2520 t = self.TextIOWrapper(NonbytesStream('a'))
2521 self.assertEqual(t.read(), u'a')
2522
2523 def test_illegal_decoder(self):
2524 # Issue #17106
2525 # Crash when decoder returns non-string
2526 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2527 encoding='quopri_codec')
2528 with self.maybeRaises(TypeError):
2529 t.read(1)
2530 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2531 encoding='quopri_codec')
2532 with self.maybeRaises(TypeError):
2533 t.readline()
2534 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2535 encoding='quopri_codec')
2536 with self.maybeRaises(TypeError):
2537 t.read()
2538
2539
Antoine Pitrou19690592009-06-12 20:14:08 +00002540class CTextIOWrapperTest(TextIOWrapperTest):
2541
2542 def test_initialization(self):
2543 r = self.BytesIO(b"\xc3\xa9\n\n")
2544 b = self.BufferedReader(r, 1000)
2545 t = self.TextIOWrapper(b)
2546 self.assertRaises(TypeError, t.__init__, b, newline=42)
2547 self.assertRaises(ValueError, t.read)
2548 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2549 self.assertRaises(ValueError, t.read)
2550
Benjamin Peterson53ae6142014-12-21 20:51:50 -06002551 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2552 self.assertRaises(Exception, repr, t)
2553
Antoine Pitrou19690592009-06-12 20:14:08 +00002554 def test_garbage_collection(self):
2555 # C TextIOWrapper objects are collected, and collecting them flushes
2556 # all data to disk.
2557 # The Python version has __del__, so it ends in gc.garbage instead.
2558 rawio = io.FileIO(support.TESTFN, "wb")
2559 b = self.BufferedWriter(rawio)
2560 t = self.TextIOWrapper(b, encoding="ascii")
2561 t.write("456def")
2562 t.x = t
2563 wr = weakref.ref(t)
2564 del t
2565 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002566 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002567 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002568 self.assertEqual(f.read(), b"456def")
2569
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002570 def test_rwpair_cleared_before_textio(self):
2571 # Issue 13070: TextIOWrapper's finalization would crash when called
2572 # after the reference to the underlying BufferedRWPair's writer got
2573 # cleared by the GC.
2574 for i in range(1000):
2575 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2576 t1 = self.TextIOWrapper(b1, encoding="ascii")
2577 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2578 t2 = self.TextIOWrapper(b2, encoding="ascii")
2579 # circular references
2580 t1.buddy = t2
2581 t2.buddy = t1
2582 support.gc_collect()
2583
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002584 maybeRaises = unittest.TestCase.assertRaises
2585
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002586
Antoine Pitrou19690592009-06-12 20:14:08 +00002587class PyTextIOWrapperTest(TextIOWrapperTest):
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002588 @contextlib.contextmanager
2589 def maybeRaises(self, *args, **kwds):
2590 yield
Antoine Pitrou19690592009-06-12 20:14:08 +00002591
2592
2593class IncrementalNewlineDecoderTest(unittest.TestCase):
2594
2595 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002596 # UTF-8 specific tests for a newline decoder
2597 def _check_decode(b, s, **kwargs):
2598 # We exercise getstate() / setstate() as well as decode()
2599 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002600 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002601 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002602 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002603
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002604 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002605
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002606 _check_decode(b'\xe8', "")
2607 _check_decode(b'\xa2', "")
2608 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002609
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002610 _check_decode(b'\xe8', "")
2611 _check_decode(b'\xa2', "")
2612 _check_decode(b'\x88', "\u8888")
2613
2614 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002615 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2616
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002617 decoder.reset()
2618 _check_decode(b'\n', "\n")
2619 _check_decode(b'\r', "")
2620 _check_decode(b'', "\n", final=True)
2621 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002622
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002623 _check_decode(b'\r', "")
2624 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002625
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002626 _check_decode(b'\r\r\n', "\n\n")
2627 _check_decode(b'\r', "")
2628 _check_decode(b'\r', "\n")
2629 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002630
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002631 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2632 _check_decode(b'\xe8\xa2\x88', "\u8888")
2633 _check_decode(b'\n', "\n")
2634 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2635 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002636
Antoine Pitrou19690592009-06-12 20:14:08 +00002637 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002638 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002639 if encoding is not None:
2640 encoder = codecs.getincrementalencoder(encoding)()
2641 def _decode_bytewise(s):
2642 # Decode one byte at a time
2643 for b in encoder.encode(s):
2644 result.append(decoder.decode(b))
2645 else:
2646 encoder = None
2647 def _decode_bytewise(s):
2648 # Decode one char at a time
2649 for c in s:
2650 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002651 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002652 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002653 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002654 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002655 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002656 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002657 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002658 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002659 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002660 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002661 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002662 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002663 input = "abc"
2664 if encoder is not None:
2665 encoder.reset()
2666 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002667 self.assertEqual(decoder.decode(input), "abc")
2668 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002669
2670 def test_newline_decoder(self):
2671 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002672 # None meaning the IncrementalNewlineDecoder takes unicode input
2673 # rather than bytes input
2674 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002675 'utf-16', 'utf-16-le', 'utf-16-be',
2676 'utf-32', 'utf-32-le', 'utf-32-be',
2677 )
2678 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002679 decoder = enc and codecs.getincrementaldecoder(enc)()
2680 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2681 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002682 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002683 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2684 self.check_newline_decoding_utf8(decoder)
2685
2686 def test_newline_bytes(self):
2687 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2688 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002689 self.assertEqual(dec.newlines, None)
2690 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2691 self.assertEqual(dec.newlines, None)
2692 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2693 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002694 dec = self.IncrementalNewlineDecoder(None, translate=False)
2695 _check(dec)
2696 dec = self.IncrementalNewlineDecoder(None, translate=True)
2697 _check(dec)
2698
2699class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2700 pass
2701
2702class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2703 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002704
Christian Heimes1a6387e2008-03-26 12:49:49 +00002705
2706# XXX Tests for open()
2707
2708class MiscIOTest(unittest.TestCase):
2709
Benjamin Petersonad100c32008-11-20 22:06:22 +00002710 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002711 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002712
Antoine Pitrou19690592009-06-12 20:14:08 +00002713 def test___all__(self):
2714 for name in self.io.__all__:
2715 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002716 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002717 if name == "open":
2718 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002719 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002720 self.assertTrue(issubclass(obj, Exception), name)
2721 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002722 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002723
Benjamin Petersonad100c32008-11-20 22:06:22 +00002724 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002725 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002726 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002727 f.close()
2728
Antoine Pitrou19690592009-06-12 20:14:08 +00002729 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002730 self.assertEqual(f.name, support.TESTFN)
2731 self.assertEqual(f.buffer.name, support.TESTFN)
2732 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2733 self.assertEqual(f.mode, "U")
2734 self.assertEqual(f.buffer.mode, "rb")
2735 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002736 f.close()
2737
Antoine Pitrou19690592009-06-12 20:14:08 +00002738 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002739 self.assertEqual(f.mode, "w+")
2740 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2741 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002742
Antoine Pitrou19690592009-06-12 20:14:08 +00002743 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002744 self.assertEqual(g.mode, "wb")
2745 self.assertEqual(g.raw.mode, "wb")
2746 self.assertEqual(g.name, f.fileno())
2747 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002748 f.close()
2749 g.close()
2750
Antoine Pitrou19690592009-06-12 20:14:08 +00002751 def test_io_after_close(self):
2752 for kwargs in [
2753 {"mode": "w"},
2754 {"mode": "wb"},
2755 {"mode": "w", "buffering": 1},
2756 {"mode": "w", "buffering": 2},
2757 {"mode": "wb", "buffering": 0},
2758 {"mode": "r"},
2759 {"mode": "rb"},
2760 {"mode": "r", "buffering": 1},
2761 {"mode": "r", "buffering": 2},
2762 {"mode": "rb", "buffering": 0},
2763 {"mode": "w+"},
2764 {"mode": "w+b"},
2765 {"mode": "w+", "buffering": 1},
2766 {"mode": "w+", "buffering": 2},
2767 {"mode": "w+b", "buffering": 0},
2768 ]:
2769 f = self.open(support.TESTFN, **kwargs)
2770 f.close()
2771 self.assertRaises(ValueError, f.flush)
2772 self.assertRaises(ValueError, f.fileno)
2773 self.assertRaises(ValueError, f.isatty)
2774 self.assertRaises(ValueError, f.__iter__)
2775 if hasattr(f, "peek"):
2776 self.assertRaises(ValueError, f.peek, 1)
2777 self.assertRaises(ValueError, f.read)
2778 if hasattr(f, "read1"):
2779 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002780 if hasattr(f, "readall"):
2781 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002782 if hasattr(f, "readinto"):
2783 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2784 self.assertRaises(ValueError, f.readline)
2785 self.assertRaises(ValueError, f.readlines)
2786 self.assertRaises(ValueError, f.seek, 0)
2787 self.assertRaises(ValueError, f.tell)
2788 self.assertRaises(ValueError, f.truncate)
2789 self.assertRaises(ValueError, f.write,
2790 b"" if "b" in kwargs['mode'] else "")
2791 self.assertRaises(ValueError, f.writelines, [])
2792 self.assertRaises(ValueError, next, f)
2793
2794 def test_blockingioerror(self):
2795 # Various BlockingIOError issues
2796 self.assertRaises(TypeError, self.BlockingIOError)
2797 self.assertRaises(TypeError, self.BlockingIOError, 1)
2798 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2799 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2800 b = self.BlockingIOError(1, "")
2801 self.assertEqual(b.characters_written, 0)
2802 class C(unicode):
2803 pass
2804 c = C("")
2805 b = self.BlockingIOError(1, c)
2806 c.b = b
2807 b.c = c
2808 wr = weakref.ref(c)
2809 del c, b
2810 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002811 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002812
2813 def test_abcs(self):
2814 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002815 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2816 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2817 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2818 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002819
2820 def _check_abc_inheritance(self, abcmodule):
2821 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002822 self.assertIsInstance(f, abcmodule.IOBase)
2823 self.assertIsInstance(f, abcmodule.RawIOBase)
2824 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2825 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002826 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002827 self.assertIsInstance(f, abcmodule.IOBase)
2828 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2829 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2830 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002831 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002832 self.assertIsInstance(f, abcmodule.IOBase)
2833 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2834 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2835 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002836
2837 def test_abc_inheritance(self):
2838 # Test implementations inherit from their respective ABCs
2839 self._check_abc_inheritance(self)
2840
2841 def test_abc_inheritance_official(self):
2842 # Test implementations inherit from the official ABCs of the
2843 # baseline "io" module.
2844 self._check_abc_inheritance(io)
2845
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01002846 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2847 def test_nonblock_pipe_write_bigbuf(self):
2848 self._test_nonblock_pipe_write(16*1024)
2849
2850 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2851 def test_nonblock_pipe_write_smallbuf(self):
2852 self._test_nonblock_pipe_write(1024)
2853
2854 def _set_non_blocking(self, fd):
2855 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2856 self.assertNotEqual(flags, -1)
2857 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2858 self.assertEqual(res, 0)
2859
2860 def _test_nonblock_pipe_write(self, bufsize):
2861 sent = []
2862 received = []
2863 r, w = os.pipe()
2864 self._set_non_blocking(r)
2865 self._set_non_blocking(w)
2866
2867 # To exercise all code paths in the C implementation we need
2868 # to play with buffer sizes. For instance, if we choose a
2869 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2870 # then we will never get a partial write of the buffer.
2871 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2872 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2873
2874 with rf, wf:
2875 for N in 9999, 73, 7574:
2876 try:
2877 i = 0
2878 while True:
2879 msg = bytes([i % 26 + 97]) * N
2880 sent.append(msg)
2881 wf.write(msg)
2882 i += 1
2883
2884 except self.BlockingIOError as e:
2885 self.assertEqual(e.args[0], errno.EAGAIN)
2886 sent[-1] = sent[-1][:e.characters_written]
2887 received.append(rf.read())
2888 msg = b'BLOCKED'
2889 wf.write(msg)
2890 sent.append(msg)
2891
2892 while True:
2893 try:
2894 wf.flush()
2895 break
2896 except self.BlockingIOError as e:
2897 self.assertEqual(e.args[0], errno.EAGAIN)
2898 self.assertEqual(e.characters_written, 0)
2899 received.append(rf.read())
2900
2901 received += iter(rf.read, None)
2902
2903 sent, received = b''.join(sent), b''.join(received)
2904 self.assertTrue(sent == received)
2905 self.assertTrue(wf.closed)
2906 self.assertTrue(rf.closed)
2907
Antoine Pitrou19690592009-06-12 20:14:08 +00002908class CMiscIOTest(MiscIOTest):
2909 io = io
2910
2911class PyMiscIOTest(MiscIOTest):
2912 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002913
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002914
2915@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2916class SignalsTest(unittest.TestCase):
2917
2918 def setUp(self):
2919 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2920
2921 def tearDown(self):
2922 signal.signal(signal.SIGALRM, self.oldalrm)
2923
2924 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00002925 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002926
2927 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02002928 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2929 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002930 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2931 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00002932 invokes the signal handler, and bubbles up the exception raised
2933 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002934 read_results = []
2935 def _read():
2936 s = os.read(r, 1)
2937 read_results.append(s)
2938 t = threading.Thread(target=_read)
2939 t.daemon = True
2940 r, w = os.pipe()
2941 try:
2942 wio = self.io.open(w, **fdopen_kwargs)
2943 t.start()
2944 signal.alarm(1)
2945 # Fill the pipe enough that the write will be blocking.
2946 # It will be interrupted by the timer armed above. Since the
2947 # other thread has read one byte, the low-level write will
2948 # return with a successful (partial) result rather than an EINTR.
2949 # The buffered IO layer must check for pending signal
2950 # handlers, which in this case will invoke alarm_interrupt().
2951 self.assertRaises(ZeroDivisionError,
Antoine Pitrou68915d72013-04-24 23:31:38 +02002952 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002953 t.join()
2954 # We got one byte, get another one and check that it isn't a
2955 # repeat of the first one.
2956 read_results.append(os.read(r, 1))
2957 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2958 finally:
2959 os.close(w)
2960 os.close(r)
2961 # This is deliberate. If we didn't close the file descriptor
2962 # before closing wio, wio would try to flush its internal
2963 # buffer, and block again.
2964 try:
2965 wio.close()
2966 except IOError as e:
2967 if e.errno != errno.EBADF:
2968 raise
2969
2970 def test_interrupted_write_unbuffered(self):
2971 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2972
2973 def test_interrupted_write_buffered(self):
2974 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2975
2976 def test_interrupted_write_text(self):
2977 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2978
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002979 def check_reentrant_write(self, data, **fdopen_kwargs):
2980 def on_alarm(*args):
2981 # Will be called reentrantly from the same thread
2982 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02002983 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002984 signal.signal(signal.SIGALRM, on_alarm)
2985 r, w = os.pipe()
2986 wio = self.io.open(w, **fdopen_kwargs)
2987 try:
2988 signal.alarm(1)
2989 # Either the reentrant call to wio.write() fails with RuntimeError,
2990 # or the signal handler raises ZeroDivisionError.
2991 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2992 while 1:
2993 for i in range(100):
2994 wio.write(data)
2995 wio.flush()
2996 # Make sure the buffer doesn't fill up and block further writes
2997 os.read(r, len(data) * 100)
2998 exc = cm.exception
2999 if isinstance(exc, RuntimeError):
3000 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3001 finally:
3002 wio.close()
3003 os.close(r)
3004
3005 def test_reentrant_write_buffered(self):
3006 self.check_reentrant_write(b"xy", mode="wb")
3007
3008 def test_reentrant_write_text(self):
3009 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3010
Antoine Pitrou6439c002011-02-25 21:35:47 +00003011 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3012 """Check that a buffered read, when it gets interrupted (either
3013 returning a partial result or EINTR), properly invokes the signal
3014 handler and retries if the latter returned successfully."""
3015 r, w = os.pipe()
3016 fdopen_kwargs["closefd"] = False
3017 def alarm_handler(sig, frame):
3018 os.write(w, b"bar")
3019 signal.signal(signal.SIGALRM, alarm_handler)
3020 try:
3021 rio = self.io.open(r, **fdopen_kwargs)
3022 os.write(w, b"foo")
3023 signal.alarm(1)
3024 # Expected behaviour:
3025 # - first raw read() returns partial b"foo"
3026 # - second raw read() returns EINTR
3027 # - third raw read() returns b"bar"
3028 self.assertEqual(decode(rio.read(6)), "foobar")
3029 finally:
3030 rio.close()
3031 os.close(w)
3032 os.close(r)
3033
3034 def test_interrupterd_read_retry_buffered(self):
3035 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3036 mode="rb")
3037
3038 def test_interrupterd_read_retry_text(self):
3039 self.check_interrupted_read_retry(lambda x: x,
3040 mode="r")
3041
3042 @unittest.skipUnless(threading, 'Threading required for this test.')
3043 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3044 """Check that a buffered write, when it gets interrupted (either
3045 returning a partial result or EINTR), properly invokes the signal
3046 handler and retries if the latter returned successfully."""
3047 select = support.import_module("select")
3048 # A quantity that exceeds the buffer size of an anonymous pipe's
3049 # write end.
Antoine Pitrou68915d72013-04-24 23:31:38 +02003050 N = support.PIPE_MAX_SIZE
Antoine Pitrou6439c002011-02-25 21:35:47 +00003051 r, w = os.pipe()
3052 fdopen_kwargs["closefd"] = False
3053 # We need a separate thread to read from the pipe and allow the
3054 # write() to finish. This thread is started after the SIGALRM is
3055 # received (forcing a first EINTR in write()).
3056 read_results = []
3057 write_finished = False
3058 def _read():
3059 while not write_finished:
3060 while r in select.select([r], [], [], 1.0)[0]:
3061 s = os.read(r, 1024)
3062 read_results.append(s)
3063 t = threading.Thread(target=_read)
3064 t.daemon = True
3065 def alarm1(sig, frame):
3066 signal.signal(signal.SIGALRM, alarm2)
3067 signal.alarm(1)
3068 def alarm2(sig, frame):
3069 t.start()
3070 signal.signal(signal.SIGALRM, alarm1)
3071 try:
3072 wio = self.io.open(w, **fdopen_kwargs)
3073 signal.alarm(1)
3074 # Expected behaviour:
3075 # - first raw write() is partial (because of the limited pipe buffer
3076 # and the first alarm)
3077 # - second raw write() returns EINTR (because of the second alarm)
3078 # - subsequent write()s are successful (either partial or complete)
3079 self.assertEqual(N, wio.write(item * N))
3080 wio.flush()
3081 write_finished = True
3082 t.join()
3083 self.assertEqual(N, sum(len(x) for x in read_results))
3084 finally:
3085 write_finished = True
3086 os.close(w)
3087 os.close(r)
3088 # This is deliberate. If we didn't close the file descriptor
3089 # before closing wio, wio would try to flush its internal
3090 # buffer, and could block (in case of failure).
3091 try:
3092 wio.close()
3093 except IOError as e:
3094 if e.errno != errno.EBADF:
3095 raise
3096
3097 def test_interrupterd_write_retry_buffered(self):
3098 self.check_interrupted_write_retry(b"x", mode="wb")
3099
3100 def test_interrupterd_write_retry_text(self):
3101 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3102
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003103
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003104class CSignalsTest(SignalsTest):
3105 io = io
3106
3107class PySignalsTest(SignalsTest):
3108 io = pyio
3109
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003110 # Handling reentrancy issues would slow down _pyio even more, so the
3111 # tests are disabled.
3112 test_reentrant_write_buffered = None
3113 test_reentrant_write_text = None
3114
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003115
Christian Heimes1a6387e2008-03-26 12:49:49 +00003116def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00003117 tests = (CIOTest, PyIOTest,
3118 CBufferedReaderTest, PyBufferedReaderTest,
3119 CBufferedWriterTest, PyBufferedWriterTest,
3120 CBufferedRWPairTest, PyBufferedRWPairTest,
3121 CBufferedRandomTest, PyBufferedRandomTest,
3122 StatefulIncrementalDecoderTest,
3123 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3124 CTextIOWrapperTest, PyTextIOWrapperTest,
3125 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003126 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00003127 )
3128
3129 # Put the namespaces of the IO module we are testing and some useful mock
3130 # classes in the __dict__ of each test.
3131 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00003132 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00003133 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3134 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3135 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3136 globs = globals()
3137 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3138 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3139 # Avoid turning open into a bound method.
3140 py_io_ns["open"] = pyio.OpenWrapper
3141 for test in tests:
3142 if test.__name__.startswith("C"):
3143 for name, obj in c_io_ns.items():
3144 setattr(test, name, obj)
3145 elif test.__name__.startswith("Py"):
3146 for name, obj in py_io_ns.items():
3147 setattr(test, name, obj)
3148
3149 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00003150
3151if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00003152 test_main()