blob: 2a3b4a3b14a32f44972ade9f25aae2949d0f80b7 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +030032import warnings
Antoine Pitrou19690592009-06-12 20:14:08 +000033import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000034import signal
35import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000036from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000037from collections import deque
Antoine Pitrou78e761e2012-10-16 22:57:11 +020038from UserList import UserList
Antoine Pitrou19690592009-06-12 20:14:08 +000039from test import test_support as support
Serhiy Storchaka354d50e2013-02-03 17:10:42 +020040import contextlib
Christian Heimes1a6387e2008-03-26 12:49:49 +000041
42import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000043import io # C implementation of io
44import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000045try:
46 import threading
47except ImportError:
48 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010049try:
50 import fcntl
51except ImportError:
52 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000053
54__metaclass__ = type
55bytes = support.py3k_bytes
56
57def _default_chunk_size():
58 """Get the default TextIOWrapper chunk size"""
59 with io.open(__file__, "r", encoding="latin1") as f:
60 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000061
62
Antoine Pitrou6391b342010-09-14 18:48:19 +000063class MockRawIOWithoutRead:
64 """A RawIO implementation without read(), so as to exercise the default
65 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000066
67 def __init__(self, read_stack=()):
68 self._read_stack = list(read_stack)
69 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000070 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000071 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000072
Christian Heimes1a6387e2008-03-26 12:49:49 +000073 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000074 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000075 return len(b)
76
77 def writable(self):
78 return True
79
80 def fileno(self):
81 return 42
82
83 def readable(self):
84 return True
85
86 def seekable(self):
87 return True
88
89 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000090 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000091
92 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000093 return 0 # same comment as above
94
95 def readinto(self, buf):
96 self._reads += 1
97 max_len = len(buf)
98 try:
99 data = self._read_stack[0]
100 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000101 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +0000102 return 0
103 if data is None:
104 del self._read_stack[0]
105 return None
106 n = len(data)
107 if len(data) <= max_len:
108 del self._read_stack[0]
109 buf[:n] = data
110 return n
111 else:
112 buf[:] = data[:max_len]
113 self._read_stack[0] = data[max_len:]
114 return max_len
115
116 def truncate(self, pos=None):
117 return pos
118
Antoine Pitrou6391b342010-09-14 18:48:19 +0000119class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
120 pass
121
122class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
123 pass
124
125
126class MockRawIO(MockRawIOWithoutRead):
127
128 def read(self, n=None):
129 self._reads += 1
130 try:
131 return self._read_stack.pop(0)
132 except:
133 self._extraneous_reads += 1
134 return b""
135
Antoine Pitrou19690592009-06-12 20:14:08 +0000136class CMockRawIO(MockRawIO, io.RawIOBase):
137 pass
138
139class PyMockRawIO(MockRawIO, pyio.RawIOBase):
140 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000141
142
Antoine Pitrou19690592009-06-12 20:14:08 +0000143class MisbehavedRawIO(MockRawIO):
144 def write(self, b):
145 return MockRawIO.write(self, b) * 2
146
147 def read(self, n=None):
148 return MockRawIO.read(self, n) * 2
149
150 def seek(self, pos, whence):
151 return -123
152
153 def tell(self):
154 return -456
155
156 def readinto(self, buf):
157 MockRawIO.readinto(self, buf)
158 return len(buf) * 5
159
160class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
161 pass
162
163class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
164 pass
165
166
167class CloseFailureIO(MockRawIO):
168 closed = 0
169
170 def close(self):
171 if not self.closed:
172 self.closed = 1
173 raise IOError
174
175class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
176 pass
177
178class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
179 pass
180
181
182class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000183
184 def __init__(self, data):
185 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000187
188 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000189 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190 self.read_history.append(None if res is None else len(res))
191 return res
192
Antoine Pitrou19690592009-06-12 20:14:08 +0000193 def readinto(self, b):
194 res = super(MockFileIO, self).readinto(b)
195 self.read_history.append(res)
196 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000197
Antoine Pitrou19690592009-06-12 20:14:08 +0000198class CMockFileIO(MockFileIO, io.BytesIO):
199 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000200
Antoine Pitrou19690592009-06-12 20:14:08 +0000201class PyMockFileIO(MockFileIO, pyio.BytesIO):
202 pass
203
204
205class MockNonBlockWriterIO:
206
207 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000208 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000209 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000210
Antoine Pitrou19690592009-06-12 20:14:08 +0000211 def pop_written(self):
212 s = b"".join(self._write_stack)
213 self._write_stack[:] = []
214 return s
215
216 def block_on(self, char):
217 """Block when a given char is encountered."""
218 self._blocker_char = char
219
220 def readable(self):
221 return True
222
223 def seekable(self):
224 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000225
226 def writable(self):
227 return True
228
Antoine Pitrou19690592009-06-12 20:14:08 +0000229 def write(self, b):
230 b = bytes(b)
231 n = -1
232 if self._blocker_char:
233 try:
234 n = b.index(self._blocker_char)
235 except ValueError:
236 pass
237 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100238 if n > 0:
239 # write data up to the first blocker
240 self._write_stack.append(b[:n])
241 return n
242 else:
243 # cancel blocker and indicate would block
244 self._blocker_char = None
245 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000246 self._write_stack.append(b)
247 return len(b)
248
249class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
250 BlockingIOError = io.BlockingIOError
251
252class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
253 BlockingIOError = pyio.BlockingIOError
254
Christian Heimes1a6387e2008-03-26 12:49:49 +0000255
256class IOTest(unittest.TestCase):
257
Antoine Pitrou19690592009-06-12 20:14:08 +0000258 def setUp(self):
259 support.unlink(support.TESTFN)
260
Christian Heimes1a6387e2008-03-26 12:49:49 +0000261 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000262 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000263
264 def write_ops(self, f):
265 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000266 f.truncate(0)
267 self.assertEqual(f.tell(), 5)
268 f.seek(0)
269
270 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000271 self.assertEqual(f.seek(0), 0)
272 self.assertEqual(f.write(b"Hello."), 6)
273 self.assertEqual(f.tell(), 6)
274 self.assertEqual(f.seek(-1, 1), 5)
275 self.assertEqual(f.tell(), 5)
276 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
277 self.assertEqual(f.seek(0), 0)
278 self.assertEqual(f.write(b"h"), 1)
279 self.assertEqual(f.seek(-1, 2), 13)
280 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000281
Christian Heimes1a6387e2008-03-26 12:49:49 +0000282 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000283 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000284 self.assertRaises(TypeError, f.seek, 0.0)
285
286 def read_ops(self, f, buffered=False):
287 data = f.read(5)
288 self.assertEqual(data, b"hello")
289 data = bytearray(data)
290 self.assertEqual(f.readinto(data), 5)
291 self.assertEqual(data, b" worl")
292 self.assertEqual(f.readinto(data), 2)
293 self.assertEqual(len(data), 5)
294 self.assertEqual(data[:2], b"d\n")
295 self.assertEqual(f.seek(0), 0)
296 self.assertEqual(f.read(20), b"hello world\n")
297 self.assertEqual(f.read(1), b"")
298 self.assertEqual(f.readinto(bytearray(b"x")), 0)
299 self.assertEqual(f.seek(-6, 2), 6)
300 self.assertEqual(f.read(5), b"world")
301 self.assertEqual(f.read(0), b"")
302 self.assertEqual(f.readinto(bytearray()), 0)
303 self.assertEqual(f.seek(-6, 1), 5)
304 self.assertEqual(f.read(5), b" worl")
305 self.assertEqual(f.tell(), 10)
306 self.assertRaises(TypeError, f.seek, 0.0)
307 if buffered:
308 f.seek(0)
309 self.assertEqual(f.read(), b"hello world\n")
310 f.seek(6)
311 self.assertEqual(f.read(), b"world\n")
312 self.assertEqual(f.read(), b"")
313
314 LARGE = 2**31
315
316 def large_file_ops(self, f):
317 assert f.readable()
318 assert f.writable()
319 self.assertEqual(f.seek(self.LARGE), self.LARGE)
320 self.assertEqual(f.tell(), self.LARGE)
321 self.assertEqual(f.write(b"xxx"), 3)
322 self.assertEqual(f.tell(), self.LARGE + 3)
323 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
324 self.assertEqual(f.truncate(), self.LARGE + 2)
325 self.assertEqual(f.tell(), self.LARGE + 2)
326 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
327 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000328 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000329 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
330 self.assertEqual(f.seek(-1, 2), self.LARGE)
331 self.assertEqual(f.read(2), b"x")
332
Antoine Pitrou19690592009-06-12 20:14:08 +0000333 def test_invalid_operations(self):
334 # Try writing on a file opened in read mode and vice-versa.
335 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000336 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000337 self.assertRaises(IOError, fp.read)
338 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000339 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000340 self.assertRaises(IOError, fp.write, b"blah")
341 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000342 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000343 self.assertRaises(IOError, fp.write, "blah")
344 self.assertRaises(IOError, fp.writelines, ["blah\n"])
345
Christian Heimes1a6387e2008-03-26 12:49:49 +0000346 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000347 with self.open(support.TESTFN, "wb", buffering=0) as f:
348 self.assertEqual(f.readable(), False)
349 self.assertEqual(f.writable(), True)
350 self.assertEqual(f.seekable(), True)
351 self.write_ops(f)
352 with self.open(support.TESTFN, "rb", buffering=0) as f:
353 self.assertEqual(f.readable(), True)
354 self.assertEqual(f.writable(), False)
355 self.assertEqual(f.seekable(), True)
356 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000357
358 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000359 with self.open(support.TESTFN, "wb") as f:
360 self.assertEqual(f.readable(), False)
361 self.assertEqual(f.writable(), True)
362 self.assertEqual(f.seekable(), True)
363 self.write_ops(f)
364 with self.open(support.TESTFN, "rb") as f:
365 self.assertEqual(f.readable(), True)
366 self.assertEqual(f.writable(), False)
367 self.assertEqual(f.seekable(), True)
368 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000369
370 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000371 with self.open(support.TESTFN, "wb") as f:
372 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
373 with self.open(support.TESTFN, "rb") as f:
374 self.assertEqual(f.readline(), b"abc\n")
375 self.assertEqual(f.readline(10), b"def\n")
376 self.assertEqual(f.readline(2), b"xy")
377 self.assertEqual(f.readline(4), b"zzy\n")
378 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000379 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000380 self.assertRaises(TypeError, f.readline, 5.3)
381 with self.open(support.TESTFN, "r") as f:
382 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000383
384 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000385 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000386 self.write_ops(f)
387 data = f.getvalue()
388 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000389 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000390 self.read_ops(f, True)
391
392 def test_large_file_ops(self):
393 # On Windows and Mac OSX this test comsumes large resources; It takes
394 # a long time to build the >2GB file and takes >2GB of disk space
395 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000396 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware1f702212013-12-10 14:09:20 -0600397 support.requires(
398 'largefile',
399 'test requires %s bytes and a long time to run' % self.LARGE)
Antoine Pitrou19690592009-06-12 20:14:08 +0000400 with self.open(support.TESTFN, "w+b", 0) as f:
401 self.large_file_ops(f)
402 with self.open(support.TESTFN, "w+b") as f:
403 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000404
405 def test_with_open(self):
406 for bufsize in (0, 1, 100):
407 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000408 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000409 f.write(b"xxx")
410 self.assertEqual(f.closed, True)
411 f = None
412 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000413 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000414 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000415 except ZeroDivisionError:
416 self.assertEqual(f.closed, True)
417 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000418 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000419
Antoine Pitroue741cc62009-01-21 00:45:36 +0000420 # issue 5008
421 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000422 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000423 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000424 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000425 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000427 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000428 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000429 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000430
Christian Heimes1a6387e2008-03-26 12:49:49 +0000431 def test_destructor(self):
432 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000433 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000434 def __del__(self):
435 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000436 try:
437 f = super(MyFileIO, self).__del__
438 except AttributeError:
439 pass
440 else:
441 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000442 def close(self):
443 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000444 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000445 def flush(self):
446 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000447 super(MyFileIO, self).flush()
448 f = MyFileIO(support.TESTFN, "wb")
449 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000450 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000451 support.gc_collect()
452 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000453 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000454 self.assertEqual(f.read(), b"xxx")
455
456 def _check_base_destructor(self, base):
457 record = []
458 class MyIO(base):
459 def __init__(self):
460 # This exercises the availability of attributes on object
461 # destruction.
462 # (in the C version, close() is called by the tp_dealloc
463 # function, not by __del__)
464 self.on_del = 1
465 self.on_close = 2
466 self.on_flush = 3
467 def __del__(self):
468 record.append(self.on_del)
469 try:
470 f = super(MyIO, self).__del__
471 except AttributeError:
472 pass
473 else:
474 f()
475 def close(self):
476 record.append(self.on_close)
477 super(MyIO, self).close()
478 def flush(self):
479 record.append(self.on_flush)
480 super(MyIO, self).flush()
481 f = MyIO()
482 del f
483 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000484 self.assertEqual(record, [1, 2, 3])
485
Antoine Pitrou19690592009-06-12 20:14:08 +0000486 def test_IOBase_destructor(self):
487 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000488
Antoine Pitrou19690592009-06-12 20:14:08 +0000489 def test_RawIOBase_destructor(self):
490 self._check_base_destructor(self.RawIOBase)
491
492 def test_BufferedIOBase_destructor(self):
493 self._check_base_destructor(self.BufferedIOBase)
494
495 def test_TextIOBase_destructor(self):
496 self._check_base_destructor(self.TextIOBase)
497
498 def test_close_flushes(self):
499 with self.open(support.TESTFN, "wb") as f:
500 f.write(b"xxx")
501 with self.open(support.TESTFN, "rb") as f:
502 self.assertEqual(f.read(), b"xxx")
503
504 def test_array_writes(self):
505 a = array.array(b'i', range(10))
506 n = len(a.tostring())
507 with self.open(support.TESTFN, "wb", 0) as f:
508 self.assertEqual(f.write(a), n)
509 with self.open(support.TESTFN, "wb") as f:
510 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000511
512 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000513 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000514 closefd=False)
515
Antoine Pitrou19690592009-06-12 20:14:08 +0000516 def test_read_closed(self):
517 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000518 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000519 with self.open(support.TESTFN, "r") as f:
520 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000521 self.assertEqual(file.read(), "egg\n")
522 file.seek(0)
523 file.close()
524 self.assertRaises(ValueError, file.read)
525
526 def test_no_closefd_with_filename(self):
527 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000528 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000529
530 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000532 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000533 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000534 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000535 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000536 self.assertEqual(file.buffer.raw.closefd, False)
537
Antoine Pitrou19690592009-06-12 20:14:08 +0000538 def test_garbage_collection(self):
539 # FileIO objects are collected, and collecting them flushes
540 # all data to disk.
541 f = self.FileIO(support.TESTFN, "wb")
542 f.write(b"abcxxx")
543 f.f = f
544 wr = weakref.ref(f)
545 del f
546 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000547 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000548 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000549 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000550
Antoine Pitrou19690592009-06-12 20:14:08 +0000551 def test_unbounded_file(self):
552 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
553 zero = "/dev/zero"
554 if not os.path.exists(zero):
555 self.skipTest("{0} does not exist".format(zero))
556 if sys.maxsize > 0x7FFFFFFF:
557 self.skipTest("test can only run in a 32-bit address space")
558 if support.real_max_memuse < support._2G:
559 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000560 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000561 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000562 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000563 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000564 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000565 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000566
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000567 def test_flush_error_on_close(self):
568 f = self.open(support.TESTFN, "wb", buffering=0)
569 def bad_flush():
570 raise IOError()
571 f.flush = bad_flush
572 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600573 self.assertTrue(f.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000574
575 def test_multi_close(self):
576 f = self.open(support.TESTFN, "wb", buffering=0)
577 f.close()
578 f.close()
579 f.close()
580 self.assertRaises(ValueError, f.flush)
581
Antoine Pitrou6391b342010-09-14 18:48:19 +0000582 def test_RawIOBase_read(self):
583 # Exercise the default RawIOBase.read() implementation (which calls
584 # readinto() internally).
585 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
586 self.assertEqual(rawio.read(2), b"ab")
587 self.assertEqual(rawio.read(2), b"c")
588 self.assertEqual(rawio.read(2), b"d")
589 self.assertEqual(rawio.read(2), None)
590 self.assertEqual(rawio.read(2), b"ef")
591 self.assertEqual(rawio.read(2), b"g")
592 self.assertEqual(rawio.read(2), None)
593 self.assertEqual(rawio.read(2), b"")
594
Hynek Schlawack877effc2012-05-25 09:24:18 +0200595 def test_fileio_closefd(self):
596 # Issue #4841
597 with self.open(__file__, 'rb') as f1, \
598 self.open(__file__, 'rb') as f2:
599 fileio = self.FileIO(f1.fileno(), closefd=False)
600 # .__init__() must not close f1
601 fileio.__init__(f2.fileno(), closefd=False)
602 f1.readline()
603 # .close() must not close f2
604 fileio.close()
605 f2.readline()
606
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +0300607 def test_nonbuffered_textio(self):
608 with warnings.catch_warnings(record=True) as recorded:
609 with self.assertRaises(ValueError):
610 self.open(support.TESTFN, 'w', buffering=0)
611 support.gc_collect()
612 self.assertEqual(recorded, [])
613
614 def test_invalid_newline(self):
615 with warnings.catch_warnings(record=True) as recorded:
616 with self.assertRaises(ValueError):
617 self.open(support.TESTFN, 'w', newline='invalid')
618 support.gc_collect()
619 self.assertEqual(recorded, [])
620
Hynek Schlawack877effc2012-05-25 09:24:18 +0200621
Antoine Pitrou19690592009-06-12 20:14:08 +0000622class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200623
624 def test_IOBase_finalize(self):
625 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
626 # class which inherits IOBase and an object of this class are caught
627 # in a reference cycle and close() is already in the method cache.
628 class MyIO(self.IOBase):
629 def close(self):
630 pass
631
632 # create an instance to populate the method cache
633 MyIO()
634 obj = MyIO()
635 obj.obj = obj
636 wr = weakref.ref(obj)
637 del MyIO
638 del obj
639 support.gc_collect()
640 self.assertTrue(wr() is None, wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000641
Antoine Pitrou19690592009-06-12 20:14:08 +0000642class PyIOTest(IOTest):
643 test_array_writes = unittest.skip(
644 "len(array.array) returns number of elements rather than bytelength"
645 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000646
647
Antoine Pitrou19690592009-06-12 20:14:08 +0000648class CommonBufferedTests:
649 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
650
651 def test_detach(self):
652 raw = self.MockRawIO()
653 buf = self.tp(raw)
654 self.assertIs(buf.detach(), raw)
655 self.assertRaises(ValueError, buf.detach)
656
657 def test_fileno(self):
658 rawio = self.MockRawIO()
659 bufio = self.tp(rawio)
660
Ezio Melotti2623a372010-11-21 13:34:58 +0000661 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000662
Zachary Ware1f702212013-12-10 14:09:20 -0600663 @unittest.skip('test having existential crisis')
Antoine Pitrou19690592009-06-12 20:14:08 +0000664 def test_no_fileno(self):
665 # XXX will we always have fileno() function? If so, kill
666 # this test. Else, write it.
667 pass
668
669 def test_invalid_args(self):
670 rawio = self.MockRawIO()
671 bufio = self.tp(rawio)
672 # Invalid whence
673 self.assertRaises(ValueError, bufio.seek, 0, -1)
674 self.assertRaises(ValueError, bufio.seek, 0, 3)
675
676 def test_override_destructor(self):
677 tp = self.tp
678 record = []
679 class MyBufferedIO(tp):
680 def __del__(self):
681 record.append(1)
682 try:
683 f = super(MyBufferedIO, self).__del__
684 except AttributeError:
685 pass
686 else:
687 f()
688 def close(self):
689 record.append(2)
690 super(MyBufferedIO, self).close()
691 def flush(self):
692 record.append(3)
693 super(MyBufferedIO, self).flush()
694 rawio = self.MockRawIO()
695 bufio = MyBufferedIO(rawio)
696 writable = bufio.writable()
697 del bufio
698 support.gc_collect()
699 if writable:
700 self.assertEqual(record, [1, 2, 3])
701 else:
702 self.assertEqual(record, [1, 2])
703
704 def test_context_manager(self):
705 # Test usability as a context manager
706 rawio = self.MockRawIO()
707 bufio = self.tp(rawio)
708 def _with():
709 with bufio:
710 pass
711 _with()
712 # bufio should now be closed, and using it a second time should raise
713 # a ValueError.
714 self.assertRaises(ValueError, _with)
715
716 def test_error_through_destructor(self):
717 # Test that the exception state is not modified by a destructor,
718 # even if close() fails.
719 rawio = self.CloseFailureIO()
720 def f():
721 self.tp(rawio).xyzzy
722 with support.captured_output("stderr") as s:
723 self.assertRaises(AttributeError, f)
724 s = s.getvalue().strip()
725 if s:
726 # The destructor *may* have printed an unraisable error, check it
727 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000728 self.assertTrue(s.startswith("Exception IOError: "), s)
729 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000730
731 def test_repr(self):
732 raw = self.MockRawIO()
733 b = self.tp(raw)
734 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
735 self.assertEqual(repr(b), "<%s>" % clsname)
736 raw.name = "dummy"
737 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
738 raw.name = b"dummy"
739 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000740
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000741 def test_flush_error_on_close(self):
742 raw = self.MockRawIO()
743 def bad_flush():
744 raise IOError()
745 raw.flush = bad_flush
746 b = self.tp(raw)
747 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600748 self.assertTrue(b.closed)
749
750 def test_close_error_on_close(self):
751 raw = self.MockRawIO()
752 def bad_flush():
753 raise IOError('flush')
754 def bad_close():
755 raise IOError('close')
756 raw.close = bad_close
757 b = self.tp(raw)
758 b.flush = bad_flush
759 with self.assertRaises(IOError) as err: # exception not swallowed
760 b.close()
761 self.assertEqual(err.exception.args, ('close',))
762 self.assertFalse(b.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000763
764 def test_multi_close(self):
765 raw = self.MockRawIO()
766 b = self.tp(raw)
767 b.close()
768 b.close()
769 b.close()
770 self.assertRaises(ValueError, b.flush)
771
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000772 def test_readonly_attributes(self):
773 raw = self.MockRawIO()
774 buf = self.tp(raw)
775 x = self.MockRawIO()
776 with self.assertRaises((AttributeError, TypeError)):
777 buf.raw = x
778
Christian Heimes1a6387e2008-03-26 12:49:49 +0000779
Antoine Pitroubff5df02012-07-29 19:02:46 +0200780class SizeofTest:
781
782 @support.cpython_only
783 def test_sizeof(self):
784 bufsize1 = 4096
785 bufsize2 = 8192
786 rawio = self.MockRawIO()
787 bufio = self.tp(rawio, buffer_size=bufsize1)
788 size = sys.getsizeof(bufio) - bufsize1
789 rawio = self.MockRawIO()
790 bufio = self.tp(rawio, buffer_size=bufsize2)
791 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
792
793
Antoine Pitrou19690592009-06-12 20:14:08 +0000794class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
795 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000796
Antoine Pitrou19690592009-06-12 20:14:08 +0000797 def test_constructor(self):
798 rawio = self.MockRawIO([b"abc"])
799 bufio = self.tp(rawio)
800 bufio.__init__(rawio)
801 bufio.__init__(rawio, buffer_size=1024)
802 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000803 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000804 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
805 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
806 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
807 rawio = self.MockRawIO([b"abc"])
808 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000809 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000810
Serhiy Storchaka1d19f972014-02-12 10:52:07 +0200811 def test_uninitialized(self):
812 bufio = self.tp.__new__(self.tp)
813 del bufio
814 bufio = self.tp.__new__(self.tp)
815 self.assertRaisesRegexp((ValueError, AttributeError),
816 'uninitialized|has no attribute',
817 bufio.read, 0)
818 bufio.__init__(self.MockRawIO())
819 self.assertEqual(bufio.read(0), b'')
820
Antoine Pitrou19690592009-06-12 20:14:08 +0000821 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000822 for arg in (None, 7):
823 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
824 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000825 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000826 # Invalid args
827 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000828
Antoine Pitrou19690592009-06-12 20:14:08 +0000829 def test_read1(self):
830 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
831 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000832 self.assertEqual(b"a", bufio.read(1))
833 self.assertEqual(b"b", bufio.read1(1))
834 self.assertEqual(rawio._reads, 1)
835 self.assertEqual(b"c", bufio.read1(100))
836 self.assertEqual(rawio._reads, 1)
837 self.assertEqual(b"d", bufio.read1(100))
838 self.assertEqual(rawio._reads, 2)
839 self.assertEqual(b"efg", bufio.read1(100))
840 self.assertEqual(rawio._reads, 3)
841 self.assertEqual(b"", bufio.read1(100))
842 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000843 # Invalid args
844 self.assertRaises(ValueError, bufio.read1, -1)
845
846 def test_readinto(self):
847 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
848 bufio = self.tp(rawio)
849 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000850 self.assertEqual(bufio.readinto(b), 2)
851 self.assertEqual(b, b"ab")
852 self.assertEqual(bufio.readinto(b), 2)
853 self.assertEqual(b, b"cd")
854 self.assertEqual(bufio.readinto(b), 2)
855 self.assertEqual(b, b"ef")
856 self.assertEqual(bufio.readinto(b), 1)
857 self.assertEqual(b, b"gf")
858 self.assertEqual(bufio.readinto(b), 0)
859 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000860
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000861 def test_readlines(self):
862 def bufio():
863 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
864 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000865 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
866 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
867 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000868
Antoine Pitrou19690592009-06-12 20:14:08 +0000869 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000870 data = b"abcdefghi"
871 dlen = len(data)
872
873 tests = [
874 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
875 [ 100, [ 3, 3, 3], [ dlen ] ],
876 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
877 ]
878
879 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000880 rawio = self.MockFileIO(data)
881 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000882 pos = 0
883 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000884 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000885 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000886 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000887 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000888
Antoine Pitrou19690592009-06-12 20:14:08 +0000889 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000890 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000891 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
892 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000893 self.assertEqual(b"abcd", bufio.read(6))
894 self.assertEqual(b"e", bufio.read(1))
895 self.assertEqual(b"fg", bufio.read())
896 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200897 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000898 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000899
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200900 rawio = self.MockRawIO((b"a", None, None))
901 self.assertEqual(b"a", rawio.readall())
902 self.assertIsNone(rawio.readall())
903
Antoine Pitrou19690592009-06-12 20:14:08 +0000904 def test_read_past_eof(self):
905 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
906 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000907
Ezio Melotti2623a372010-11-21 13:34:58 +0000908 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000909
Antoine Pitrou19690592009-06-12 20:14:08 +0000910 def test_read_all(self):
911 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
912 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000913
Ezio Melotti2623a372010-11-21 13:34:58 +0000914 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000915
Victor Stinner6a102812010-04-27 23:55:59 +0000916 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000917 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000918 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000919 try:
920 # Write out many bytes with exactly the same number of 0's,
921 # 1's... 255's. This will help us check that concurrent reading
922 # doesn't duplicate or forget contents.
923 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000924 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000925 random.shuffle(l)
926 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000927 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000928 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000929 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000930 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000931 errors = []
932 results = []
933 def f():
934 try:
935 # Intra-buffer read then buffer-flushing read
936 for n in cycle([1, 19]):
937 s = bufio.read(n)
938 if not s:
939 break
940 # list.append() is atomic
941 results.append(s)
942 except Exception as e:
943 errors.append(e)
944 raise
945 threads = [threading.Thread(target=f) for x in range(20)]
946 for t in threads:
947 t.start()
948 time.sleep(0.02) # yield
949 for t in threads:
950 t.join()
951 self.assertFalse(errors,
952 "the following exceptions were caught: %r" % errors)
953 s = b''.join(results)
954 for i in range(256):
955 c = bytes(bytearray([i]))
956 self.assertEqual(s.count(c), N)
957 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000958 support.unlink(support.TESTFN)
959
960 def test_misbehaved_io(self):
961 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
962 bufio = self.tp(rawio)
963 self.assertRaises(IOError, bufio.seek, 0)
964 self.assertRaises(IOError, bufio.tell)
965
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000966 def test_no_extraneous_read(self):
967 # Issue #9550; when the raw IO object has satisfied the read request,
968 # we should not issue any additional reads, otherwise it may block
969 # (e.g. socket).
970 bufsize = 16
971 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
972 rawio = self.MockRawIO([b"x" * n])
973 bufio = self.tp(rawio, bufsize)
974 self.assertEqual(bufio.read(n), b"x" * n)
975 # Simple case: one raw read is enough to satisfy the request.
976 self.assertEqual(rawio._extraneous_reads, 0,
977 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
978 # A more complex case where two raw reads are needed to satisfy
979 # the request.
980 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
981 bufio = self.tp(rawio, bufsize)
982 self.assertEqual(bufio.read(n), b"x" * n)
983 self.assertEqual(rawio._extraneous_reads, 0,
984 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
985
986
Antoine Pitroubff5df02012-07-29 19:02:46 +0200987class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +0000988 tp = io.BufferedReader
989
990 def test_constructor(self):
991 BufferedReaderTest.test_constructor(self)
992 # The allocation can succeed on 32-bit builds, e.g. with more
993 # than 2GB RAM and a 64-bit kernel.
994 if sys.maxsize > 0x7FFFFFFF:
995 rawio = self.MockRawIO()
996 bufio = self.tp(rawio)
997 self.assertRaises((OverflowError, MemoryError, ValueError),
998 bufio.__init__, rawio, sys.maxsize)
999
1000 def test_initialization(self):
1001 rawio = self.MockRawIO([b"abc"])
1002 bufio = self.tp(rawio)
1003 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1004 self.assertRaises(ValueError, bufio.read)
1005 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1006 self.assertRaises(ValueError, bufio.read)
1007 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1008 self.assertRaises(ValueError, bufio.read)
1009
1010 def test_misbehaved_io_read(self):
1011 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1012 bufio = self.tp(rawio)
1013 # _pyio.BufferedReader seems to implement reading different, so that
1014 # checking this is not so easy.
1015 self.assertRaises(IOError, bufio.read, 10)
1016
1017 def test_garbage_collection(self):
1018 # C BufferedReader objects are collected.
1019 # The Python version has __del__, so it ends into gc.garbage instead
1020 rawio = self.FileIO(support.TESTFN, "w+b")
1021 f = self.tp(rawio)
1022 f.f = f
1023 wr = weakref.ref(f)
1024 del f
1025 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001026 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00001027
R David Murray5b2cf5e2013-02-23 22:11:21 -05001028 def test_args_error(self):
1029 # Issue #17275
1030 with self.assertRaisesRegexp(TypeError, "BufferedReader"):
1031 self.tp(io.BytesIO(), 1024, 1024, 1024)
1032
1033
Antoine Pitrou19690592009-06-12 20:14:08 +00001034class PyBufferedReaderTest(BufferedReaderTest):
1035 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001036
1037
Antoine Pitrou19690592009-06-12 20:14:08 +00001038class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1039 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001040
Antoine Pitrou19690592009-06-12 20:14:08 +00001041 def test_constructor(self):
1042 rawio = self.MockRawIO()
1043 bufio = self.tp(rawio)
1044 bufio.__init__(rawio)
1045 bufio.__init__(rawio, buffer_size=1024)
1046 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +00001047 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001048 bufio.flush()
1049 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1050 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1051 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1052 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +00001053 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001054 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001055 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001056
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001057 def test_uninitialized(self):
1058 bufio = self.tp.__new__(self.tp)
1059 del bufio
1060 bufio = self.tp.__new__(self.tp)
1061 self.assertRaisesRegexp((ValueError, AttributeError),
1062 'uninitialized|has no attribute',
1063 bufio.write, b'')
1064 bufio.__init__(self.MockRawIO())
1065 self.assertEqual(bufio.write(b''), 0)
1066
Antoine Pitrou19690592009-06-12 20:14:08 +00001067 def test_detach_flush(self):
1068 raw = self.MockRawIO()
1069 buf = self.tp(raw)
1070 buf.write(b"howdy!")
1071 self.assertFalse(raw._write_stack)
1072 buf.detach()
1073 self.assertEqual(raw._write_stack, [b"howdy!"])
1074
1075 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001076 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +00001077 writer = self.MockRawIO()
1078 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001079 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001080 self.assertFalse(writer._write_stack)
1081
Antoine Pitrou19690592009-06-12 20:14:08 +00001082 def test_write_overflow(self):
1083 writer = self.MockRawIO()
1084 bufio = self.tp(writer, 8)
1085 contents = b"abcdefghijklmnop"
1086 for n in range(0, len(contents), 3):
1087 bufio.write(contents[n:n+3])
1088 flushed = b"".join(writer._write_stack)
1089 # At least (total - 8) bytes were implicitly flushed, perhaps more
1090 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001091 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001092
Antoine Pitrou19690592009-06-12 20:14:08 +00001093 def check_writes(self, intermediate_func):
1094 # Lots of writes, test the flushed output is as expected.
1095 contents = bytes(range(256)) * 1000
1096 n = 0
1097 writer = self.MockRawIO()
1098 bufio = self.tp(writer, 13)
1099 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1100 def gen_sizes():
1101 for size in count(1):
1102 for i in range(15):
1103 yield size
1104 sizes = gen_sizes()
1105 while n < len(contents):
1106 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001107 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001108 intermediate_func(bufio)
1109 n += size
1110 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001111 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001112 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001113
Antoine Pitrou19690592009-06-12 20:14:08 +00001114 def test_writes(self):
1115 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001116
Antoine Pitrou19690592009-06-12 20:14:08 +00001117 def test_writes_and_flushes(self):
1118 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001119
Antoine Pitrou19690592009-06-12 20:14:08 +00001120 def test_writes_and_seeks(self):
1121 def _seekabs(bufio):
1122 pos = bufio.tell()
1123 bufio.seek(pos + 1, 0)
1124 bufio.seek(pos - 1, 0)
1125 bufio.seek(pos, 0)
1126 self.check_writes(_seekabs)
1127 def _seekrel(bufio):
1128 pos = bufio.seek(0, 1)
1129 bufio.seek(+1, 1)
1130 bufio.seek(-1, 1)
1131 bufio.seek(pos, 0)
1132 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001133
Antoine Pitrou19690592009-06-12 20:14:08 +00001134 def test_writes_and_truncates(self):
1135 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001136
Antoine Pitrou19690592009-06-12 20:14:08 +00001137 def test_write_non_blocking(self):
1138 raw = self.MockNonBlockWriterIO()
1139 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001140
Ezio Melotti2623a372010-11-21 13:34:58 +00001141 self.assertEqual(bufio.write(b"abcd"), 4)
1142 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001143 # 1 byte will be written, the rest will be buffered
1144 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001145 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001146
Antoine Pitrou19690592009-06-12 20:14:08 +00001147 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1148 raw.block_on(b"0")
1149 try:
1150 bufio.write(b"opqrwxyz0123456789")
1151 except self.BlockingIOError as e:
1152 written = e.characters_written
1153 else:
1154 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001155 self.assertEqual(written, 16)
1156 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001157 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001158
Ezio Melotti2623a372010-11-21 13:34:58 +00001159 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001160 s = raw.pop_written()
1161 # Previously buffered bytes were flushed
1162 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001163
Antoine Pitrou19690592009-06-12 20:14:08 +00001164 def test_write_and_rewind(self):
1165 raw = io.BytesIO()
1166 bufio = self.tp(raw, 4)
1167 self.assertEqual(bufio.write(b"abcdef"), 6)
1168 self.assertEqual(bufio.tell(), 6)
1169 bufio.seek(0, 0)
1170 self.assertEqual(bufio.write(b"XY"), 2)
1171 bufio.seek(6, 0)
1172 self.assertEqual(raw.getvalue(), b"XYcdef")
1173 self.assertEqual(bufio.write(b"123456"), 6)
1174 bufio.flush()
1175 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001176
Antoine Pitrou19690592009-06-12 20:14:08 +00001177 def test_flush(self):
1178 writer = self.MockRawIO()
1179 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001180 bufio.write(b"abc")
1181 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001182 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001183
Antoine Pitrou78e761e2012-10-16 22:57:11 +02001184 def test_writelines(self):
1185 l = [b'ab', b'cd', b'ef']
1186 writer = self.MockRawIO()
1187 bufio = self.tp(writer, 8)
1188 bufio.writelines(l)
1189 bufio.flush()
1190 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1191
1192 def test_writelines_userlist(self):
1193 l = UserList([b'ab', b'cd', b'ef'])
1194 writer = self.MockRawIO()
1195 bufio = self.tp(writer, 8)
1196 bufio.writelines(l)
1197 bufio.flush()
1198 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1199
1200 def test_writelines_error(self):
1201 writer = self.MockRawIO()
1202 bufio = self.tp(writer, 8)
1203 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1204 self.assertRaises(TypeError, bufio.writelines, None)
1205
Antoine Pitrou19690592009-06-12 20:14:08 +00001206 def test_destructor(self):
1207 writer = self.MockRawIO()
1208 bufio = self.tp(writer, 8)
1209 bufio.write(b"abc")
1210 del bufio
1211 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001212 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001213
1214 def test_truncate(self):
1215 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001216 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001217 bufio = self.tp(raw, 8)
1218 bufio.write(b"abcdef")
1219 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001220 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001221 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001222 self.assertEqual(f.read(), b"abc")
1223
Victor Stinner6a102812010-04-27 23:55:59 +00001224 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001225 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001226 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001227 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001228 # Write out many bytes from many threads and test they were
1229 # all flushed.
1230 N = 1000
1231 contents = bytes(range(256)) * N
1232 sizes = cycle([1, 19])
1233 n = 0
1234 queue = deque()
1235 while n < len(contents):
1236 size = next(sizes)
1237 queue.append(contents[n:n+size])
1238 n += size
1239 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001240 # We use a real file object because it allows us to
1241 # exercise situations where the GIL is released before
1242 # writing the buffer to the raw streams. This is in addition
1243 # to concurrency issues due to switching threads in the middle
1244 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001245 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001246 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001247 errors = []
1248 def f():
1249 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001250 while True:
1251 try:
1252 s = queue.popleft()
1253 except IndexError:
1254 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001255 bufio.write(s)
1256 except Exception as e:
1257 errors.append(e)
1258 raise
1259 threads = [threading.Thread(target=f) for x in range(20)]
1260 for t in threads:
1261 t.start()
1262 time.sleep(0.02) # yield
1263 for t in threads:
1264 t.join()
1265 self.assertFalse(errors,
1266 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001267 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001268 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001269 s = f.read()
1270 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001271 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001272 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001273 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001274
Antoine Pitrou19690592009-06-12 20:14:08 +00001275 def test_misbehaved_io(self):
1276 rawio = self.MisbehavedRawIO()
1277 bufio = self.tp(rawio, 5)
1278 self.assertRaises(IOError, bufio.seek, 0)
1279 self.assertRaises(IOError, bufio.tell)
1280 self.assertRaises(IOError, bufio.write, b"abcdef")
1281
1282 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001283 with support.check_warnings(("max_buffer_size is deprecated",
1284 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001285 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001286
Benjamin Petersona2d6d712012-12-20 12:24:10 -06001287 def test_write_error_on_close(self):
1288 raw = self.MockRawIO()
1289 def bad_write(b):
1290 raise IOError()
1291 raw.write = bad_write
1292 b = self.tp(raw)
1293 b.write(b'spam')
1294 self.assertRaises(IOError, b.close) # exception not swallowed
1295 self.assertTrue(b.closed)
1296
Antoine Pitrou19690592009-06-12 20:14:08 +00001297
Antoine Pitroubff5df02012-07-29 19:02:46 +02001298class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001299 tp = io.BufferedWriter
1300
1301 def test_constructor(self):
1302 BufferedWriterTest.test_constructor(self)
1303 # The allocation can succeed on 32-bit builds, e.g. with more
1304 # than 2GB RAM and a 64-bit kernel.
1305 if sys.maxsize > 0x7FFFFFFF:
1306 rawio = self.MockRawIO()
1307 bufio = self.tp(rawio)
1308 self.assertRaises((OverflowError, MemoryError, ValueError),
1309 bufio.__init__, rawio, sys.maxsize)
1310
1311 def test_initialization(self):
1312 rawio = self.MockRawIO()
1313 bufio = self.tp(rawio)
1314 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1315 self.assertRaises(ValueError, bufio.write, b"def")
1316 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1317 self.assertRaises(ValueError, bufio.write, b"def")
1318 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1319 self.assertRaises(ValueError, bufio.write, b"def")
1320
1321 def test_garbage_collection(self):
1322 # C BufferedWriter objects are collected, and collecting them flushes
1323 # all data to disk.
1324 # The Python version has __del__, so it ends into gc.garbage instead
1325 rawio = self.FileIO(support.TESTFN, "w+b")
1326 f = self.tp(rawio)
1327 f.write(b"123xxx")
1328 f.x = f
1329 wr = weakref.ref(f)
1330 del f
1331 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001332 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001333 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001334 self.assertEqual(f.read(), b"123xxx")
1335
R David Murray5b2cf5e2013-02-23 22:11:21 -05001336 def test_args_error(self):
1337 # Issue #17275
1338 with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
1339 self.tp(io.BytesIO(), 1024, 1024, 1024)
1340
Antoine Pitrou19690592009-06-12 20:14:08 +00001341
1342class PyBufferedWriterTest(BufferedWriterTest):
1343 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001344
1345class BufferedRWPairTest(unittest.TestCase):
1346
Antoine Pitrou19690592009-06-12 20:14:08 +00001347 def test_constructor(self):
1348 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001349 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001350
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001351 def test_uninitialized(self):
1352 pair = self.tp.__new__(self.tp)
1353 del pair
1354 pair = self.tp.__new__(self.tp)
1355 self.assertRaisesRegexp((ValueError, AttributeError),
1356 'uninitialized|has no attribute',
1357 pair.read, 0)
1358 self.assertRaisesRegexp((ValueError, AttributeError),
1359 'uninitialized|has no attribute',
1360 pair.write, b'')
1361 pair.__init__(self.MockRawIO(), self.MockRawIO())
1362 self.assertEqual(pair.read(0), b'')
1363 self.assertEqual(pair.write(b''), 0)
1364
Antoine Pitrou19690592009-06-12 20:14:08 +00001365 def test_detach(self):
1366 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1367 self.assertRaises(self.UnsupportedOperation, pair.detach)
1368
1369 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001370 with support.check_warnings(("max_buffer_size is deprecated",
1371 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001372 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001373
1374 def test_constructor_with_not_readable(self):
1375 class NotReadable(MockRawIO):
1376 def readable(self):
1377 return False
1378
1379 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1380
1381 def test_constructor_with_not_writeable(self):
1382 class NotWriteable(MockRawIO):
1383 def writable(self):
1384 return False
1385
1386 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1387
1388 def test_read(self):
1389 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1390
1391 self.assertEqual(pair.read(3), b"abc")
1392 self.assertEqual(pair.read(1), b"d")
1393 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001394 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1395 self.assertEqual(pair.read(None), b"abc")
1396
1397 def test_readlines(self):
1398 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1399 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1400 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1401 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001402
1403 def test_read1(self):
1404 # .read1() is delegated to the underlying reader object, so this test
1405 # can be shallow.
1406 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1407
1408 self.assertEqual(pair.read1(3), b"abc")
1409
1410 def test_readinto(self):
1411 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1412
1413 data = bytearray(5)
1414 self.assertEqual(pair.readinto(data), 5)
1415 self.assertEqual(data, b"abcde")
1416
1417 def test_write(self):
1418 w = self.MockRawIO()
1419 pair = self.tp(self.MockRawIO(), w)
1420
1421 pair.write(b"abc")
1422 pair.flush()
1423 pair.write(b"def")
1424 pair.flush()
1425 self.assertEqual(w._write_stack, [b"abc", b"def"])
1426
1427 def test_peek(self):
1428 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1429
1430 self.assertTrue(pair.peek(3).startswith(b"abc"))
1431 self.assertEqual(pair.read(3), b"abc")
1432
1433 def test_readable(self):
1434 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1435 self.assertTrue(pair.readable())
1436
1437 def test_writeable(self):
1438 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1439 self.assertTrue(pair.writable())
1440
1441 def test_seekable(self):
1442 # BufferedRWPairs are never seekable, even if their readers and writers
1443 # are.
1444 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1445 self.assertFalse(pair.seekable())
1446
1447 # .flush() is delegated to the underlying writer object and has been
1448 # tested in the test_write method.
1449
1450 def test_close_and_closed(self):
1451 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1452 self.assertFalse(pair.closed)
1453 pair.close()
1454 self.assertTrue(pair.closed)
1455
1456 def test_isatty(self):
1457 class SelectableIsAtty(MockRawIO):
1458 def __init__(self, isatty):
1459 MockRawIO.__init__(self)
1460 self._isatty = isatty
1461
1462 def isatty(self):
1463 return self._isatty
1464
1465 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1466 self.assertFalse(pair.isatty())
1467
1468 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1469 self.assertTrue(pair.isatty())
1470
1471 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1472 self.assertTrue(pair.isatty())
1473
1474 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1475 self.assertTrue(pair.isatty())
1476
Benjamin Peterson1c873bf2014-09-29 22:46:57 -04001477 def test_weakref_clearing(self):
1478 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1479 ref = weakref.ref(brw)
1480 brw = None
1481 ref = None # Shouldn't segfault.
1482
Antoine Pitrou19690592009-06-12 20:14:08 +00001483class CBufferedRWPairTest(BufferedRWPairTest):
1484 tp = io.BufferedRWPair
1485
1486class PyBufferedRWPairTest(BufferedRWPairTest):
1487 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001488
1489
Antoine Pitrou19690592009-06-12 20:14:08 +00001490class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1491 read_mode = "rb+"
1492 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001493
Antoine Pitrou19690592009-06-12 20:14:08 +00001494 def test_constructor(self):
1495 BufferedReaderTest.test_constructor(self)
1496 BufferedWriterTest.test_constructor(self)
1497
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001498 def test_uninitialized(self):
1499 BufferedReaderTest.test_uninitialized(self)
1500 BufferedWriterTest.test_uninitialized(self)
1501
Antoine Pitrou19690592009-06-12 20:14:08 +00001502 def test_read_and_write(self):
1503 raw = self.MockRawIO((b"asdf", b"ghjk"))
1504 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001505
1506 self.assertEqual(b"as", rw.read(2))
1507 rw.write(b"ddd")
1508 rw.write(b"eee")
1509 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001510 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001511 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001512
Antoine Pitrou19690592009-06-12 20:14:08 +00001513 def test_seek_and_tell(self):
1514 raw = self.BytesIO(b"asdfghjkl")
1515 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001516
Ezio Melotti2623a372010-11-21 13:34:58 +00001517 self.assertEqual(b"as", rw.read(2))
1518 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001519 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001520 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001521
Antoine Pitrou808cec52011-08-20 15:40:58 +02001522 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001523 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001524 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001525 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001526 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001527 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001528 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001529 self.assertEqual(7, rw.tell())
1530 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001531 rw.flush()
1532 self.assertEqual(b"asdf123fl", raw.getvalue())
1533
Christian Heimes1a6387e2008-03-26 12:49:49 +00001534 self.assertRaises(TypeError, rw.seek, 0.0)
1535
Antoine Pitrou19690592009-06-12 20:14:08 +00001536 def check_flush_and_read(self, read_func):
1537 raw = self.BytesIO(b"abcdefghi")
1538 bufio = self.tp(raw)
1539
Ezio Melotti2623a372010-11-21 13:34:58 +00001540 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001541 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001542 self.assertEqual(b"ef", read_func(bufio, 2))
1543 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001544 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001545 self.assertEqual(6, bufio.tell())
1546 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001547 raw.seek(0, 0)
1548 raw.write(b"XYZ")
1549 # flush() resets the read buffer
1550 bufio.flush()
1551 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001552 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001553
1554 def test_flush_and_read(self):
1555 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1556
1557 def test_flush_and_readinto(self):
1558 def _readinto(bufio, n=-1):
1559 b = bytearray(n if n >= 0 else 9999)
1560 n = bufio.readinto(b)
1561 return bytes(b[:n])
1562 self.check_flush_and_read(_readinto)
1563
1564 def test_flush_and_peek(self):
1565 def _peek(bufio, n=-1):
1566 # This relies on the fact that the buffer can contain the whole
1567 # raw stream, otherwise peek() can return less.
1568 b = bufio.peek(n)
1569 if n != -1:
1570 b = b[:n]
1571 bufio.seek(len(b), 1)
1572 return b
1573 self.check_flush_and_read(_peek)
1574
1575 def test_flush_and_write(self):
1576 raw = self.BytesIO(b"abcdefghi")
1577 bufio = self.tp(raw)
1578
1579 bufio.write(b"123")
1580 bufio.flush()
1581 bufio.write(b"45")
1582 bufio.flush()
1583 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001584 self.assertEqual(b"12345fghi", raw.getvalue())
1585 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001586
1587 def test_threads(self):
1588 BufferedReaderTest.test_threads(self)
1589 BufferedWriterTest.test_threads(self)
1590
1591 def test_writes_and_peek(self):
1592 def _peek(bufio):
1593 bufio.peek(1)
1594 self.check_writes(_peek)
1595 def _peek(bufio):
1596 pos = bufio.tell()
1597 bufio.seek(-1, 1)
1598 bufio.peek(1)
1599 bufio.seek(pos, 0)
1600 self.check_writes(_peek)
1601
1602 def test_writes_and_reads(self):
1603 def _read(bufio):
1604 bufio.seek(-1, 1)
1605 bufio.read(1)
1606 self.check_writes(_read)
1607
1608 def test_writes_and_read1s(self):
1609 def _read1(bufio):
1610 bufio.seek(-1, 1)
1611 bufio.read1(1)
1612 self.check_writes(_read1)
1613
1614 def test_writes_and_readintos(self):
1615 def _read(bufio):
1616 bufio.seek(-1, 1)
1617 bufio.readinto(bytearray(1))
1618 self.check_writes(_read)
1619
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001620 def test_write_after_readahead(self):
1621 # Issue #6629: writing after the buffer was filled by readahead should
1622 # first rewind the raw stream.
1623 for overwrite_size in [1, 5]:
1624 raw = self.BytesIO(b"A" * 10)
1625 bufio = self.tp(raw, 4)
1626 # Trigger readahead
1627 self.assertEqual(bufio.read(1), b"A")
1628 self.assertEqual(bufio.tell(), 1)
1629 # Overwriting should rewind the raw stream if it needs so
1630 bufio.write(b"B" * overwrite_size)
1631 self.assertEqual(bufio.tell(), overwrite_size + 1)
1632 # If the write size was smaller than the buffer size, flush() and
1633 # check that rewind happens.
1634 bufio.flush()
1635 self.assertEqual(bufio.tell(), overwrite_size + 1)
1636 s = raw.getvalue()
1637 self.assertEqual(s,
1638 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1639
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001640 def test_write_rewind_write(self):
1641 # Various combinations of reading / writing / seeking backwards / writing again
1642 def mutate(bufio, pos1, pos2):
1643 assert pos2 >= pos1
1644 # Fill the buffer
1645 bufio.seek(pos1)
1646 bufio.read(pos2 - pos1)
1647 bufio.write(b'\x02')
1648 # This writes earlier than the previous write, but still inside
1649 # the buffer.
1650 bufio.seek(pos1)
1651 bufio.write(b'\x01')
1652
1653 b = b"\x80\x81\x82\x83\x84"
1654 for i in range(0, len(b)):
1655 for j in range(i, len(b)):
1656 raw = self.BytesIO(b)
1657 bufio = self.tp(raw, 100)
1658 mutate(bufio, i, j)
1659 bufio.flush()
1660 expected = bytearray(b)
1661 expected[j] = 2
1662 expected[i] = 1
1663 self.assertEqual(raw.getvalue(), expected,
1664 "failed result for i=%d, j=%d" % (i, j))
1665
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001666 def test_truncate_after_read_or_write(self):
1667 raw = self.BytesIO(b"A" * 10)
1668 bufio = self.tp(raw, 100)
1669 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1670 self.assertEqual(bufio.truncate(), 2)
1671 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1672 self.assertEqual(bufio.truncate(), 4)
1673
Antoine Pitrou19690592009-06-12 20:14:08 +00001674 def test_misbehaved_io(self):
1675 BufferedReaderTest.test_misbehaved_io(self)
1676 BufferedWriterTest.test_misbehaved_io(self)
1677
Antoine Pitrou808cec52011-08-20 15:40:58 +02001678 def test_interleaved_read_write(self):
1679 # Test for issue #12213
1680 with self.BytesIO(b'abcdefgh') as raw:
1681 with self.tp(raw, 100) as f:
1682 f.write(b"1")
1683 self.assertEqual(f.read(1), b'b')
1684 f.write(b'2')
1685 self.assertEqual(f.read1(1), b'd')
1686 f.write(b'3')
1687 buf = bytearray(1)
1688 f.readinto(buf)
1689 self.assertEqual(buf, b'f')
1690 f.write(b'4')
1691 self.assertEqual(f.peek(1), b'h')
1692 f.flush()
1693 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1694
1695 with self.BytesIO(b'abc') as raw:
1696 with self.tp(raw, 100) as f:
1697 self.assertEqual(f.read(1), b'a')
1698 f.write(b"2")
1699 self.assertEqual(f.read(1), b'c')
1700 f.flush()
1701 self.assertEqual(raw.getvalue(), b'a2c')
1702
1703 def test_interleaved_readline_write(self):
1704 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1705 with self.tp(raw) as f:
1706 f.write(b'1')
1707 self.assertEqual(f.readline(), b'b\n')
1708 f.write(b'2')
1709 self.assertEqual(f.readline(), b'def\n')
1710 f.write(b'3')
1711 self.assertEqual(f.readline(), b'\n')
1712 f.flush()
1713 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1714
R David Murray5b2cf5e2013-02-23 22:11:21 -05001715
Antoine Pitroubff5df02012-07-29 19:02:46 +02001716class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1717 BufferedRandomTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001718 tp = io.BufferedRandom
1719
1720 def test_constructor(self):
1721 BufferedRandomTest.test_constructor(self)
1722 # The allocation can succeed on 32-bit builds, e.g. with more
1723 # than 2GB RAM and a 64-bit kernel.
1724 if sys.maxsize > 0x7FFFFFFF:
1725 rawio = self.MockRawIO()
1726 bufio = self.tp(rawio)
1727 self.assertRaises((OverflowError, MemoryError, ValueError),
1728 bufio.__init__, rawio, sys.maxsize)
1729
1730 def test_garbage_collection(self):
1731 CBufferedReaderTest.test_garbage_collection(self)
1732 CBufferedWriterTest.test_garbage_collection(self)
1733
R David Murray5b2cf5e2013-02-23 22:11:21 -05001734 def test_args_error(self):
1735 # Issue #17275
1736 with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
1737 self.tp(io.BytesIO(), 1024, 1024, 1024)
1738
1739
Antoine Pitrou19690592009-06-12 20:14:08 +00001740class PyBufferedRandomTest(BufferedRandomTest):
1741 tp = pyio.BufferedRandom
1742
1743
Christian Heimes1a6387e2008-03-26 12:49:49 +00001744# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1745# properties:
1746# - A single output character can correspond to many bytes of input.
1747# - The number of input bytes to complete the character can be
1748# undetermined until the last input byte is received.
1749# - The number of input bytes can vary depending on previous input.
1750# - A single input byte can correspond to many characters of output.
1751# - The number of output characters can be undetermined until the
1752# last input byte is received.
1753# - The number of output characters can vary depending on previous input.
1754
1755class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1756 """
1757 For testing seek/tell behavior with a stateful, buffering decoder.
1758
1759 Input is a sequence of words. Words may be fixed-length (length set
1760 by input) or variable-length (period-terminated). In variable-length
1761 mode, extra periods are ignored. Possible words are:
1762 - 'i' followed by a number sets the input length, I (maximum 99).
1763 When I is set to 0, words are space-terminated.
1764 - 'o' followed by a number sets the output length, O (maximum 99).
1765 - Any other word is converted into a word followed by a period on
1766 the output. The output word consists of the input word truncated
1767 or padded out with hyphens to make its length equal to O. If O
1768 is 0, the word is output verbatim without truncating or padding.
1769 I and O are initially set to 1. When I changes, any buffered input is
1770 re-scanned according to the new I. EOF also terminates the last word.
1771 """
1772
1773 def __init__(self, errors='strict'):
1774 codecs.IncrementalDecoder.__init__(self, errors)
1775 self.reset()
1776
1777 def __repr__(self):
1778 return '<SID %x>' % id(self)
1779
1780 def reset(self):
1781 self.i = 1
1782 self.o = 1
1783 self.buffer = bytearray()
1784
1785 def getstate(self):
1786 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1787 return bytes(self.buffer), i*100 + o
1788
1789 def setstate(self, state):
1790 buffer, io = state
1791 self.buffer = bytearray(buffer)
1792 i, o = divmod(io, 100)
1793 self.i, self.o = i ^ 1, o ^ 1
1794
1795 def decode(self, input, final=False):
1796 output = ''
1797 for b in input:
1798 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001799 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001800 if self.buffer:
1801 output += self.process_word()
1802 else:
1803 self.buffer.append(b)
1804 else: # fixed-length, terminate after self.i bytes
1805 self.buffer.append(b)
1806 if len(self.buffer) == self.i:
1807 output += self.process_word()
1808 if final and self.buffer: # EOF terminates the last word
1809 output += self.process_word()
1810 return output
1811
1812 def process_word(self):
1813 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001814 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001815 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001816 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001817 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1818 else:
1819 output = self.buffer.decode('ascii')
1820 if len(output) < self.o:
1821 output += '-'*self.o # pad out with hyphens
1822 if self.o:
1823 output = output[:self.o] # truncate to output length
1824 output += '.'
1825 self.buffer = bytearray()
1826 return output
1827
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001828 codecEnabled = False
1829
1830 @classmethod
1831 def lookupTestDecoder(cls, name):
1832 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001833 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001834 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001835 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001836 incrementalencoder=None,
1837 streamreader=None, streamwriter=None,
1838 incrementaldecoder=cls)
1839
1840# Register the previous decoder for testing.
1841# Disabled by default, tests will enable it.
1842codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1843
1844
Christian Heimes1a6387e2008-03-26 12:49:49 +00001845class StatefulIncrementalDecoderTest(unittest.TestCase):
1846 """
1847 Make sure the StatefulIncrementalDecoder actually works.
1848 """
1849
1850 test_cases = [
1851 # I=1, O=1 (fixed-length input == fixed-length output)
1852 (b'abcd', False, 'a.b.c.d.'),
1853 # I=0, O=0 (variable-length input, variable-length output)
1854 (b'oiabcd', True, 'abcd.'),
1855 # I=0, O=0 (should ignore extra periods)
1856 (b'oi...abcd...', True, 'abcd.'),
1857 # I=0, O=6 (variable-length input, fixed-length output)
1858 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1859 # I=2, O=6 (fixed-length input < fixed-length output)
1860 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1861 # I=6, O=3 (fixed-length input > fixed-length output)
1862 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1863 # I=0, then 3; O=29, then 15 (with longer output)
1864 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1865 'a----------------------------.' +
1866 'b----------------------------.' +
1867 'cde--------------------------.' +
1868 'abcdefghijabcde.' +
1869 'a.b------------.' +
1870 '.c.------------.' +
1871 'd.e------------.' +
1872 'k--------------.' +
1873 'l--------------.' +
1874 'm--------------.')
1875 ]
1876
Antoine Pitrou19690592009-06-12 20:14:08 +00001877 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001878 # Try a few one-shot test cases.
1879 for input, eof, output in self.test_cases:
1880 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001881 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001882
1883 # Also test an unfinished decode, followed by forcing EOF.
1884 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001885 self.assertEqual(d.decode(b'oiabcd'), '')
1886 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001887
1888class TextIOWrapperTest(unittest.TestCase):
1889
1890 def setUp(self):
1891 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1892 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001893 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001894
1895 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001896 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001897
Antoine Pitrou19690592009-06-12 20:14:08 +00001898 def test_constructor(self):
1899 r = self.BytesIO(b"\xc3\xa9\n\n")
1900 b = self.BufferedReader(r, 1000)
1901 t = self.TextIOWrapper(b)
1902 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001903 self.assertEqual(t.encoding, "latin1")
1904 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001905 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001906 self.assertEqual(t.encoding, "utf8")
1907 self.assertEqual(t.line_buffering, True)
1908 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001909 self.assertRaises(TypeError, t.__init__, b, newline=42)
1910 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1911
1912 def test_detach(self):
1913 r = self.BytesIO()
1914 b = self.BufferedWriter(r)
1915 t = self.TextIOWrapper(b)
1916 self.assertIs(t.detach(), b)
1917
1918 t = self.TextIOWrapper(b, encoding="ascii")
1919 t.write("howdy")
1920 self.assertFalse(r.getvalue())
1921 t.detach()
1922 self.assertEqual(r.getvalue(), b"howdy")
1923 self.assertRaises(ValueError, t.detach)
1924
1925 def test_repr(self):
1926 raw = self.BytesIO("hello".encode("utf-8"))
1927 b = self.BufferedReader(raw)
1928 t = self.TextIOWrapper(b, encoding="utf-8")
1929 modname = self.TextIOWrapper.__module__
1930 self.assertEqual(repr(t),
1931 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1932 raw.name = "dummy"
1933 self.assertEqual(repr(t),
1934 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1935 raw.name = b"dummy"
1936 self.assertEqual(repr(t),
1937 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1938
1939 def test_line_buffering(self):
1940 r = self.BytesIO()
1941 b = self.BufferedWriter(r, 1000)
1942 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1943 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00001944 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001945 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00001946 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001947 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00001948 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001949
Antoine Pitrou19690592009-06-12 20:14:08 +00001950 def test_encoding(self):
1951 # Check the encoding attribute is always set, and valid
1952 b = self.BytesIO()
1953 t = self.TextIOWrapper(b, encoding="utf8")
1954 self.assertEqual(t.encoding, "utf8")
1955 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001956 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001957 codecs.lookup(t.encoding)
1958
1959 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001960 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001961 b = self.BytesIO(b"abc\n\xff\n")
1962 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001963 self.assertRaises(UnicodeError, t.read)
1964 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001965 b = self.BytesIO(b"abc\n\xff\n")
1966 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001967 self.assertRaises(UnicodeError, t.read)
1968 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001969 b = self.BytesIO(b"abc\n\xff\n")
1970 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00001971 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001972 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001973 b = self.BytesIO(b"abc\n\xff\n")
1974 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00001975 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001976
Antoine Pitrou19690592009-06-12 20:14:08 +00001977 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001978 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001979 b = self.BytesIO()
1980 t = self.TextIOWrapper(b, encoding="ascii")
1981 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001982 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001983 b = self.BytesIO()
1984 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1985 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001986 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001987 b = self.BytesIO()
1988 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001989 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001990 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001991 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001992 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001993 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001994 b = self.BytesIO()
1995 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001996 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001997 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001998 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001999 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002000
Antoine Pitrou19690592009-06-12 20:14:08 +00002001 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002002 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2003
2004 tests = [
2005 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2006 [ '', input_lines ],
2007 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2008 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2009 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2010 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002011 encodings = (
2012 'utf-8', 'latin-1',
2013 'utf-16', 'utf-16-le', 'utf-16-be',
2014 'utf-32', 'utf-32-le', 'utf-32-be',
2015 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00002016
2017 # Try a range of buffer sizes to test the case where \r is the last
2018 # character in TextIOWrapper._pending_line.
2019 for encoding in encodings:
2020 # XXX: str.encode() should return bytes
2021 data = bytes(''.join(input_lines).encode(encoding))
2022 for do_reads in (False, True):
2023 for bufsize in range(1, 10):
2024 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002025 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2026 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00002027 encoding=encoding)
2028 if do_reads:
2029 got_lines = []
2030 while True:
2031 c2 = textio.read(2)
2032 if c2 == '':
2033 break
Ezio Melotti2623a372010-11-21 13:34:58 +00002034 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002035 got_lines.append(c2 + textio.readline())
2036 else:
2037 got_lines = list(textio)
2038
2039 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00002040 self.assertEqual(got_line, exp_line)
2041 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002042
Antoine Pitrou19690592009-06-12 20:14:08 +00002043 def test_newlines_input(self):
2044 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002045 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2046 for newline, expected in [
2047 (None, normalized.decode("ascii").splitlines(True)),
2048 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00002049 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2050 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2051 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00002052 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00002053 buf = self.BytesIO(testdata)
2054 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00002055 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002056 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002057 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002058
Antoine Pitrou19690592009-06-12 20:14:08 +00002059 def test_newlines_output(self):
2060 testdict = {
2061 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2062 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2063 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2064 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2065 }
2066 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2067 for newline, expected in tests:
2068 buf = self.BytesIO()
2069 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2070 txt.write("AAA\nB")
2071 txt.write("BB\nCCC\n")
2072 txt.write("X\rY\r\nZ")
2073 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002074 self.assertEqual(buf.closed, False)
2075 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00002076
2077 def test_destructor(self):
2078 l = []
2079 base = self.BytesIO
2080 class MyBytesIO(base):
2081 def close(self):
2082 l.append(self.getvalue())
2083 base.close(self)
2084 b = MyBytesIO()
2085 t = self.TextIOWrapper(b, encoding="ascii")
2086 t.write("abc")
2087 del t
2088 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00002089 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00002090
2091 def test_override_destructor(self):
2092 record = []
2093 class MyTextIO(self.TextIOWrapper):
2094 def __del__(self):
2095 record.append(1)
2096 try:
2097 f = super(MyTextIO, self).__del__
2098 except AttributeError:
2099 pass
2100 else:
2101 f()
2102 def close(self):
2103 record.append(2)
2104 super(MyTextIO, self).close()
2105 def flush(self):
2106 record.append(3)
2107 super(MyTextIO, self).flush()
2108 b = self.BytesIO()
2109 t = MyTextIO(b, encoding="ascii")
2110 del t
2111 support.gc_collect()
2112 self.assertEqual(record, [1, 2, 3])
2113
2114 def test_error_through_destructor(self):
2115 # Test that the exception state is not modified by a destructor,
2116 # even if close() fails.
2117 rawio = self.CloseFailureIO()
2118 def f():
2119 self.TextIOWrapper(rawio).xyzzy
2120 with support.captured_output("stderr") as s:
2121 self.assertRaises(AttributeError, f)
2122 s = s.getvalue().strip()
2123 if s:
2124 # The destructor *may* have printed an unraisable error, check it
2125 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002126 self.assertTrue(s.startswith("Exception IOError: "), s)
2127 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002128
2129 # Systematic tests of the text I/O API
2130
Antoine Pitrou19690592009-06-12 20:14:08 +00002131 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002132 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2133 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00002134 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002135 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002136 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002137 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002138 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002139 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002140 self.assertEqual(f.tell(), 0)
2141 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002142 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002143 self.assertEqual(f.seek(0), 0)
2144 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002145 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002146 self.assertEqual(f.read(2), "ab")
2147 self.assertEqual(f.read(1), "c")
2148 self.assertEqual(f.read(1), "")
2149 self.assertEqual(f.read(), "")
2150 self.assertEqual(f.tell(), cookie)
2151 self.assertEqual(f.seek(0), 0)
2152 self.assertEqual(f.seek(0, 2), cookie)
2153 self.assertEqual(f.write("def"), 3)
2154 self.assertEqual(f.seek(cookie), cookie)
2155 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002156 if enc.startswith("utf"):
2157 self.multi_line_test(f, enc)
2158 f.close()
2159
2160 def multi_line_test(self, f, enc):
2161 f.seek(0)
2162 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002163 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002164 wlines = []
2165 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2166 chars = []
2167 for i in range(size):
2168 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002169 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002170 wlines.append((f.tell(), line))
2171 f.write(line)
2172 f.seek(0)
2173 rlines = []
2174 while True:
2175 pos = f.tell()
2176 line = f.readline()
2177 if not line:
2178 break
2179 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002180 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002181
Antoine Pitrou19690592009-06-12 20:14:08 +00002182 def test_telling(self):
2183 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002184 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002185 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002186 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002187 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002188 p2 = f.tell()
2189 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002190 self.assertEqual(f.tell(), p0)
2191 self.assertEqual(f.readline(), "\xff\n")
2192 self.assertEqual(f.tell(), p1)
2193 self.assertEqual(f.readline(), "\xff\n")
2194 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002195 f.seek(0)
2196 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002197 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002198 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002199 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002200 f.close()
2201
Antoine Pitrou19690592009-06-12 20:14:08 +00002202 def test_seeking(self):
2203 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002204 prefix_size = chunk_size - 2
2205 u_prefix = "a" * prefix_size
2206 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002207 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002208 u_suffix = "\u8888\n"
2209 suffix = bytes(u_suffix.encode("utf-8"))
2210 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002211 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002212 f.write(line*2)
2213 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002214 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002215 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002216 self.assertEqual(s, prefix.decode("ascii"))
2217 self.assertEqual(f.tell(), prefix_size)
2218 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002219
Antoine Pitrou19690592009-06-12 20:14:08 +00002220 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002221 # Regression test for a specific bug
2222 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002223 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002224 f.write(data)
2225 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002226 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002227 f._CHUNK_SIZE # Just test that it exists
2228 f._CHUNK_SIZE = 2
2229 f.readline()
2230 f.tell()
2231
Antoine Pitrou19690592009-06-12 20:14:08 +00002232 def test_seek_and_tell(self):
2233 #Test seek/tell using the StatefulIncrementalDecoder.
2234 # Make test faster by doing smaller seeks
2235 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002236
Antoine Pitrou19690592009-06-12 20:14:08 +00002237 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002238 """Tell/seek to various points within a data stream and ensure
2239 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002240 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002241 f.write(data)
2242 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002243 f = self.open(support.TESTFN, encoding='test_decoder')
2244 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002245 decoded = f.read()
2246 f.close()
2247
2248 for i in range(min_pos, len(decoded) + 1): # seek positions
2249 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002250 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002251 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002252 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002253 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002254 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002255 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002256 f.close()
2257
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002258 # Enable the test decoder.
2259 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002260
2261 # Run the tests.
2262 try:
2263 # Try each test case.
2264 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002265 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002266
2267 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002268 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2269 offset = CHUNK_SIZE - len(input)//2
2270 prefix = b'.'*offset
2271 # Don't bother seeking into the prefix (takes too long).
2272 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002273 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002274
2275 # Ensure our test decoder won't interfere with subsequent tests.
2276 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002277 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002278
Antoine Pitrou19690592009-06-12 20:14:08 +00002279 def test_encoded_writes(self):
2280 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002281 tests = ("utf-16",
2282 "utf-16-le",
2283 "utf-16-be",
2284 "utf-32",
2285 "utf-32-le",
2286 "utf-32-be")
2287 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002288 buf = self.BytesIO()
2289 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002290 # Check if the BOM is written only once (see issue1753).
2291 f.write(data)
2292 f.write(data)
2293 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002294 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002295 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002296 self.assertEqual(f.read(), data * 2)
2297 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002298
Antoine Pitrou19690592009-06-12 20:14:08 +00002299 def test_unreadable(self):
2300 class UnReadable(self.BytesIO):
2301 def readable(self):
2302 return False
2303 txt = self.TextIOWrapper(UnReadable())
2304 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002305
Antoine Pitrou19690592009-06-12 20:14:08 +00002306 def test_read_one_by_one(self):
2307 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002308 reads = ""
2309 while True:
2310 c = txt.read(1)
2311 if not c:
2312 break
2313 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002314 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002315
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002316 def test_readlines(self):
2317 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2318 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2319 txt.seek(0)
2320 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2321 txt.seek(0)
2322 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2323
Christian Heimes1a6387e2008-03-26 12:49:49 +00002324 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002325 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002326 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002327 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002328 reads = ""
2329 while True:
2330 c = txt.read(128)
2331 if not c:
2332 break
2333 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002334 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002335
Antoine Pitroueadca1d2012-10-16 23:02:27 +02002336 def test_writelines(self):
2337 l = ['ab', 'cd', 'ef']
2338 buf = self.BytesIO()
2339 txt = self.TextIOWrapper(buf)
2340 txt.writelines(l)
2341 txt.flush()
2342 self.assertEqual(buf.getvalue(), b'abcdef')
2343
2344 def test_writelines_userlist(self):
2345 l = UserList(['ab', 'cd', 'ef'])
2346 buf = self.BytesIO()
2347 txt = self.TextIOWrapper(buf)
2348 txt.writelines(l)
2349 txt.flush()
2350 self.assertEqual(buf.getvalue(), b'abcdef')
2351
2352 def test_writelines_error(self):
2353 txt = self.TextIOWrapper(self.BytesIO())
2354 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2355 self.assertRaises(TypeError, txt.writelines, None)
2356 self.assertRaises(TypeError, txt.writelines, b'abc')
2357
Christian Heimes1a6387e2008-03-26 12:49:49 +00002358 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002359 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002360
2361 # read one char at a time
2362 reads = ""
2363 while True:
2364 c = txt.read(1)
2365 if not c:
2366 break
2367 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002368 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002369
2370 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002371 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002372 txt._CHUNK_SIZE = 4
2373
2374 reads = ""
2375 while True:
2376 c = txt.read(4)
2377 if not c:
2378 break
2379 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002380 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002381
2382 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002383 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002384 txt._CHUNK_SIZE = 4
2385
2386 reads = txt.read(4)
2387 reads += txt.read(4)
2388 reads += txt.readline()
2389 reads += txt.readline()
2390 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002391 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002392
2393 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002394 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002395 txt._CHUNK_SIZE = 4
2396
2397 reads = txt.read(4)
2398 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002399 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002400
2401 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002402 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002403 txt._CHUNK_SIZE = 4
2404
2405 reads = txt.read(4)
2406 pos = txt.tell()
2407 txt.seek(0)
2408 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002409 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002410
2411 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002412 buffer = self.BytesIO(self.testdata)
2413 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002414
2415 self.assertEqual(buffer.seekable(), txt.seekable())
2416
Antoine Pitrou19690592009-06-12 20:14:08 +00002417 def test_append_bom(self):
2418 # The BOM is not written again when appending to a non-empty file
2419 filename = support.TESTFN
2420 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2421 with self.open(filename, 'w', encoding=charset) as f:
2422 f.write('aaa')
2423 pos = f.tell()
2424 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002425 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002426
2427 with self.open(filename, 'a', encoding=charset) as f:
2428 f.write('xxx')
2429 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002430 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002431
Antoine Pitrou19690592009-06-12 20:14:08 +00002432 def test_seek_bom(self):
2433 # Same test, but when seeking manually
2434 filename = support.TESTFN
2435 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2436 with self.open(filename, 'w', encoding=charset) as f:
2437 f.write('aaa')
2438 pos = f.tell()
2439 with self.open(filename, 'r+', encoding=charset) as f:
2440 f.seek(pos)
2441 f.write('zzz')
2442 f.seek(0)
2443 f.write('bbb')
2444 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002445 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002446
2447 def test_errors_property(self):
2448 with self.open(support.TESTFN, "w") as f:
2449 self.assertEqual(f.errors, "strict")
2450 with self.open(support.TESTFN, "w", errors="replace") as f:
2451 self.assertEqual(f.errors, "replace")
2452
Victor Stinner6a102812010-04-27 23:55:59 +00002453 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002454 def test_threads_write(self):
2455 # Issue6750: concurrent writes could duplicate data
2456 event = threading.Event()
2457 with self.open(support.TESTFN, "w", buffering=1) as f:
2458 def run(n):
2459 text = "Thread%03d\n" % n
2460 event.wait()
2461 f.write(text)
2462 threads = [threading.Thread(target=lambda n=x: run(n))
2463 for x in range(20)]
2464 for t in threads:
2465 t.start()
2466 time.sleep(0.02)
2467 event.set()
2468 for t in threads:
2469 t.join()
2470 with self.open(support.TESTFN) as f:
2471 content = f.read()
2472 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002473 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002474
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002475 def test_flush_error_on_close(self):
2476 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2477 def bad_flush():
2478 raise IOError()
2479 txt.flush = bad_flush
2480 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -06002481 self.assertTrue(txt.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002482
2483 def test_multi_close(self):
2484 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2485 txt.close()
2486 txt.close()
2487 txt.close()
2488 self.assertRaises(ValueError, txt.flush)
2489
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002490 def test_readonly_attributes(self):
2491 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2492 buf = self.BytesIO(self.testdata)
2493 with self.assertRaises((AttributeError, TypeError)):
2494 txt.buffer = buf
2495
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002496 def test_read_nonbytes(self):
2497 # Issue #17106
2498 # Crash when underlying read() returns non-bytes
2499 class NonbytesStream(self.StringIO):
2500 read1 = self.StringIO.read
2501 class NonbytesStream(self.StringIO):
2502 read1 = self.StringIO.read
2503 t = self.TextIOWrapper(NonbytesStream('a'))
2504 with self.maybeRaises(TypeError):
2505 t.read(1)
2506 t = self.TextIOWrapper(NonbytesStream('a'))
2507 with self.maybeRaises(TypeError):
2508 t.readline()
2509 t = self.TextIOWrapper(NonbytesStream('a'))
2510 self.assertEqual(t.read(), u'a')
2511
2512 def test_illegal_decoder(self):
2513 # Issue #17106
2514 # Crash when decoder returns non-string
2515 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2516 encoding='quopri_codec')
2517 with self.maybeRaises(TypeError):
2518 t.read(1)
2519 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2520 encoding='quopri_codec')
2521 with self.maybeRaises(TypeError):
2522 t.readline()
2523 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2524 encoding='quopri_codec')
2525 with self.maybeRaises(TypeError):
2526 t.read()
2527
2528
Antoine Pitrou19690592009-06-12 20:14:08 +00002529class CTextIOWrapperTest(TextIOWrapperTest):
2530
2531 def test_initialization(self):
2532 r = self.BytesIO(b"\xc3\xa9\n\n")
2533 b = self.BufferedReader(r, 1000)
2534 t = self.TextIOWrapper(b)
2535 self.assertRaises(TypeError, t.__init__, b, newline=42)
2536 self.assertRaises(ValueError, t.read)
2537 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2538 self.assertRaises(ValueError, t.read)
2539
2540 def test_garbage_collection(self):
2541 # C TextIOWrapper objects are collected, and collecting them flushes
2542 # all data to disk.
2543 # The Python version has __del__, so it ends in gc.garbage instead.
2544 rawio = io.FileIO(support.TESTFN, "wb")
2545 b = self.BufferedWriter(rawio)
2546 t = self.TextIOWrapper(b, encoding="ascii")
2547 t.write("456def")
2548 t.x = t
2549 wr = weakref.ref(t)
2550 del t
2551 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002552 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002553 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002554 self.assertEqual(f.read(), b"456def")
2555
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002556 def test_rwpair_cleared_before_textio(self):
2557 # Issue 13070: TextIOWrapper's finalization would crash when called
2558 # after the reference to the underlying BufferedRWPair's writer got
2559 # cleared by the GC.
2560 for i in range(1000):
2561 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2562 t1 = self.TextIOWrapper(b1, encoding="ascii")
2563 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2564 t2 = self.TextIOWrapper(b2, encoding="ascii")
2565 # circular references
2566 t1.buddy = t2
2567 t2.buddy = t1
2568 support.gc_collect()
2569
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002570 maybeRaises = unittest.TestCase.assertRaises
2571
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002572
Antoine Pitrou19690592009-06-12 20:14:08 +00002573class PyTextIOWrapperTest(TextIOWrapperTest):
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002574 @contextlib.contextmanager
2575 def maybeRaises(self, *args, **kwds):
2576 yield
Antoine Pitrou19690592009-06-12 20:14:08 +00002577
2578
2579class IncrementalNewlineDecoderTest(unittest.TestCase):
2580
2581 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002582 # UTF-8 specific tests for a newline decoder
2583 def _check_decode(b, s, **kwargs):
2584 # We exercise getstate() / setstate() as well as decode()
2585 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002586 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002587 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002588 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002589
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002590 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002591
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002592 _check_decode(b'\xe8', "")
2593 _check_decode(b'\xa2', "")
2594 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002595
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002596 _check_decode(b'\xe8', "")
2597 _check_decode(b'\xa2', "")
2598 _check_decode(b'\x88', "\u8888")
2599
2600 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002601 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2602
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002603 decoder.reset()
2604 _check_decode(b'\n', "\n")
2605 _check_decode(b'\r', "")
2606 _check_decode(b'', "\n", final=True)
2607 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002608
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002609 _check_decode(b'\r', "")
2610 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002611
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002612 _check_decode(b'\r\r\n', "\n\n")
2613 _check_decode(b'\r', "")
2614 _check_decode(b'\r', "\n")
2615 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002616
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002617 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2618 _check_decode(b'\xe8\xa2\x88', "\u8888")
2619 _check_decode(b'\n', "\n")
2620 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2621 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002622
Antoine Pitrou19690592009-06-12 20:14:08 +00002623 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002624 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002625 if encoding is not None:
2626 encoder = codecs.getincrementalencoder(encoding)()
2627 def _decode_bytewise(s):
2628 # Decode one byte at a time
2629 for b in encoder.encode(s):
2630 result.append(decoder.decode(b))
2631 else:
2632 encoder = None
2633 def _decode_bytewise(s):
2634 # Decode one char at a time
2635 for c in s:
2636 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002637 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002638 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002639 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002640 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002641 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002642 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002643 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002644 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002645 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002646 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002647 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002648 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002649 input = "abc"
2650 if encoder is not None:
2651 encoder.reset()
2652 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002653 self.assertEqual(decoder.decode(input), "abc")
2654 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002655
2656 def test_newline_decoder(self):
2657 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002658 # None meaning the IncrementalNewlineDecoder takes unicode input
2659 # rather than bytes input
2660 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002661 'utf-16', 'utf-16-le', 'utf-16-be',
2662 'utf-32', 'utf-32-le', 'utf-32-be',
2663 )
2664 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002665 decoder = enc and codecs.getincrementaldecoder(enc)()
2666 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2667 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002668 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002669 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2670 self.check_newline_decoding_utf8(decoder)
2671
2672 def test_newline_bytes(self):
2673 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2674 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002675 self.assertEqual(dec.newlines, None)
2676 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2677 self.assertEqual(dec.newlines, None)
2678 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2679 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002680 dec = self.IncrementalNewlineDecoder(None, translate=False)
2681 _check(dec)
2682 dec = self.IncrementalNewlineDecoder(None, translate=True)
2683 _check(dec)
2684
2685class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2686 pass
2687
2688class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2689 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002690
Christian Heimes1a6387e2008-03-26 12:49:49 +00002691
2692# XXX Tests for open()
2693
2694class MiscIOTest(unittest.TestCase):
2695
Benjamin Petersonad100c32008-11-20 22:06:22 +00002696 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002697 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002698
Antoine Pitrou19690592009-06-12 20:14:08 +00002699 def test___all__(self):
2700 for name in self.io.__all__:
2701 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002702 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002703 if name == "open":
2704 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002705 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002706 self.assertTrue(issubclass(obj, Exception), name)
2707 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002708 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002709
Benjamin Petersonad100c32008-11-20 22:06:22 +00002710 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002711 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002712 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002713 f.close()
2714
Antoine Pitrou19690592009-06-12 20:14:08 +00002715 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002716 self.assertEqual(f.name, support.TESTFN)
2717 self.assertEqual(f.buffer.name, support.TESTFN)
2718 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2719 self.assertEqual(f.mode, "U")
2720 self.assertEqual(f.buffer.mode, "rb")
2721 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002722 f.close()
2723
Antoine Pitrou19690592009-06-12 20:14:08 +00002724 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002725 self.assertEqual(f.mode, "w+")
2726 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2727 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002728
Antoine Pitrou19690592009-06-12 20:14:08 +00002729 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002730 self.assertEqual(g.mode, "wb")
2731 self.assertEqual(g.raw.mode, "wb")
2732 self.assertEqual(g.name, f.fileno())
2733 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002734 f.close()
2735 g.close()
2736
Antoine Pitrou19690592009-06-12 20:14:08 +00002737 def test_io_after_close(self):
2738 for kwargs in [
2739 {"mode": "w"},
2740 {"mode": "wb"},
2741 {"mode": "w", "buffering": 1},
2742 {"mode": "w", "buffering": 2},
2743 {"mode": "wb", "buffering": 0},
2744 {"mode": "r"},
2745 {"mode": "rb"},
2746 {"mode": "r", "buffering": 1},
2747 {"mode": "r", "buffering": 2},
2748 {"mode": "rb", "buffering": 0},
2749 {"mode": "w+"},
2750 {"mode": "w+b"},
2751 {"mode": "w+", "buffering": 1},
2752 {"mode": "w+", "buffering": 2},
2753 {"mode": "w+b", "buffering": 0},
2754 ]:
2755 f = self.open(support.TESTFN, **kwargs)
2756 f.close()
2757 self.assertRaises(ValueError, f.flush)
2758 self.assertRaises(ValueError, f.fileno)
2759 self.assertRaises(ValueError, f.isatty)
2760 self.assertRaises(ValueError, f.__iter__)
2761 if hasattr(f, "peek"):
2762 self.assertRaises(ValueError, f.peek, 1)
2763 self.assertRaises(ValueError, f.read)
2764 if hasattr(f, "read1"):
2765 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002766 if hasattr(f, "readall"):
2767 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002768 if hasattr(f, "readinto"):
2769 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2770 self.assertRaises(ValueError, f.readline)
2771 self.assertRaises(ValueError, f.readlines)
2772 self.assertRaises(ValueError, f.seek, 0)
2773 self.assertRaises(ValueError, f.tell)
2774 self.assertRaises(ValueError, f.truncate)
2775 self.assertRaises(ValueError, f.write,
2776 b"" if "b" in kwargs['mode'] else "")
2777 self.assertRaises(ValueError, f.writelines, [])
2778 self.assertRaises(ValueError, next, f)
2779
2780 def test_blockingioerror(self):
2781 # Various BlockingIOError issues
2782 self.assertRaises(TypeError, self.BlockingIOError)
2783 self.assertRaises(TypeError, self.BlockingIOError, 1)
2784 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2785 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2786 b = self.BlockingIOError(1, "")
2787 self.assertEqual(b.characters_written, 0)
2788 class C(unicode):
2789 pass
2790 c = C("")
2791 b = self.BlockingIOError(1, c)
2792 c.b = b
2793 b.c = c
2794 wr = weakref.ref(c)
2795 del c, b
2796 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002797 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002798
2799 def test_abcs(self):
2800 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002801 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2802 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2803 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2804 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002805
2806 def _check_abc_inheritance(self, abcmodule):
2807 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002808 self.assertIsInstance(f, abcmodule.IOBase)
2809 self.assertIsInstance(f, abcmodule.RawIOBase)
2810 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2811 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002812 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002813 self.assertIsInstance(f, abcmodule.IOBase)
2814 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2815 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2816 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002817 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002818 self.assertIsInstance(f, abcmodule.IOBase)
2819 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2820 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2821 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002822
2823 def test_abc_inheritance(self):
2824 # Test implementations inherit from their respective ABCs
2825 self._check_abc_inheritance(self)
2826
2827 def test_abc_inheritance_official(self):
2828 # Test implementations inherit from the official ABCs of the
2829 # baseline "io" module.
2830 self._check_abc_inheritance(io)
2831
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01002832 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2833 def test_nonblock_pipe_write_bigbuf(self):
2834 self._test_nonblock_pipe_write(16*1024)
2835
2836 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2837 def test_nonblock_pipe_write_smallbuf(self):
2838 self._test_nonblock_pipe_write(1024)
2839
2840 def _set_non_blocking(self, fd):
2841 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2842 self.assertNotEqual(flags, -1)
2843 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2844 self.assertEqual(res, 0)
2845
2846 def _test_nonblock_pipe_write(self, bufsize):
2847 sent = []
2848 received = []
2849 r, w = os.pipe()
2850 self._set_non_blocking(r)
2851 self._set_non_blocking(w)
2852
2853 # To exercise all code paths in the C implementation we need
2854 # to play with buffer sizes. For instance, if we choose a
2855 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2856 # then we will never get a partial write of the buffer.
2857 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2858 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2859
2860 with rf, wf:
2861 for N in 9999, 73, 7574:
2862 try:
2863 i = 0
2864 while True:
2865 msg = bytes([i % 26 + 97]) * N
2866 sent.append(msg)
2867 wf.write(msg)
2868 i += 1
2869
2870 except self.BlockingIOError as e:
2871 self.assertEqual(e.args[0], errno.EAGAIN)
2872 sent[-1] = sent[-1][:e.characters_written]
2873 received.append(rf.read())
2874 msg = b'BLOCKED'
2875 wf.write(msg)
2876 sent.append(msg)
2877
2878 while True:
2879 try:
2880 wf.flush()
2881 break
2882 except self.BlockingIOError as e:
2883 self.assertEqual(e.args[0], errno.EAGAIN)
2884 self.assertEqual(e.characters_written, 0)
2885 received.append(rf.read())
2886
2887 received += iter(rf.read, None)
2888
2889 sent, received = b''.join(sent), b''.join(received)
2890 self.assertTrue(sent == received)
2891 self.assertTrue(wf.closed)
2892 self.assertTrue(rf.closed)
2893
Antoine Pitrou19690592009-06-12 20:14:08 +00002894class CMiscIOTest(MiscIOTest):
2895 io = io
2896
2897class PyMiscIOTest(MiscIOTest):
2898 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002899
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002900
2901@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2902class SignalsTest(unittest.TestCase):
2903
2904 def setUp(self):
2905 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2906
2907 def tearDown(self):
2908 signal.signal(signal.SIGALRM, self.oldalrm)
2909
2910 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00002911 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002912
2913 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02002914 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2915 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002916 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2917 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00002918 invokes the signal handler, and bubbles up the exception raised
2919 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002920 read_results = []
2921 def _read():
2922 s = os.read(r, 1)
2923 read_results.append(s)
2924 t = threading.Thread(target=_read)
2925 t.daemon = True
2926 r, w = os.pipe()
2927 try:
2928 wio = self.io.open(w, **fdopen_kwargs)
2929 t.start()
2930 signal.alarm(1)
2931 # Fill the pipe enough that the write will be blocking.
2932 # It will be interrupted by the timer armed above. Since the
2933 # other thread has read one byte, the low-level write will
2934 # return with a successful (partial) result rather than an EINTR.
2935 # The buffered IO layer must check for pending signal
2936 # handlers, which in this case will invoke alarm_interrupt().
2937 self.assertRaises(ZeroDivisionError,
Antoine Pitrou68915d72013-04-24 23:31:38 +02002938 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002939 t.join()
2940 # We got one byte, get another one and check that it isn't a
2941 # repeat of the first one.
2942 read_results.append(os.read(r, 1))
2943 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2944 finally:
2945 os.close(w)
2946 os.close(r)
2947 # This is deliberate. If we didn't close the file descriptor
2948 # before closing wio, wio would try to flush its internal
2949 # buffer, and block again.
2950 try:
2951 wio.close()
2952 except IOError as e:
2953 if e.errno != errno.EBADF:
2954 raise
2955
2956 def test_interrupted_write_unbuffered(self):
2957 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2958
2959 def test_interrupted_write_buffered(self):
2960 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2961
2962 def test_interrupted_write_text(self):
2963 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2964
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002965 def check_reentrant_write(self, data, **fdopen_kwargs):
2966 def on_alarm(*args):
2967 # Will be called reentrantly from the same thread
2968 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02002969 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002970 signal.signal(signal.SIGALRM, on_alarm)
2971 r, w = os.pipe()
2972 wio = self.io.open(w, **fdopen_kwargs)
2973 try:
2974 signal.alarm(1)
2975 # Either the reentrant call to wio.write() fails with RuntimeError,
2976 # or the signal handler raises ZeroDivisionError.
2977 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2978 while 1:
2979 for i in range(100):
2980 wio.write(data)
2981 wio.flush()
2982 # Make sure the buffer doesn't fill up and block further writes
2983 os.read(r, len(data) * 100)
2984 exc = cm.exception
2985 if isinstance(exc, RuntimeError):
2986 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2987 finally:
2988 wio.close()
2989 os.close(r)
2990
2991 def test_reentrant_write_buffered(self):
2992 self.check_reentrant_write(b"xy", mode="wb")
2993
2994 def test_reentrant_write_text(self):
2995 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2996
Antoine Pitrou6439c002011-02-25 21:35:47 +00002997 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2998 """Check that a buffered read, when it gets interrupted (either
2999 returning a partial result or EINTR), properly invokes the signal
3000 handler and retries if the latter returned successfully."""
3001 r, w = os.pipe()
3002 fdopen_kwargs["closefd"] = False
3003 def alarm_handler(sig, frame):
3004 os.write(w, b"bar")
3005 signal.signal(signal.SIGALRM, alarm_handler)
3006 try:
3007 rio = self.io.open(r, **fdopen_kwargs)
3008 os.write(w, b"foo")
3009 signal.alarm(1)
3010 # Expected behaviour:
3011 # - first raw read() returns partial b"foo"
3012 # - second raw read() returns EINTR
3013 # - third raw read() returns b"bar"
3014 self.assertEqual(decode(rio.read(6)), "foobar")
3015 finally:
3016 rio.close()
3017 os.close(w)
3018 os.close(r)
3019
3020 def test_interrupterd_read_retry_buffered(self):
3021 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3022 mode="rb")
3023
3024 def test_interrupterd_read_retry_text(self):
3025 self.check_interrupted_read_retry(lambda x: x,
3026 mode="r")
3027
3028 @unittest.skipUnless(threading, 'Threading required for this test.')
3029 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3030 """Check that a buffered write, when it gets interrupted (either
3031 returning a partial result or EINTR), properly invokes the signal
3032 handler and retries if the latter returned successfully."""
3033 select = support.import_module("select")
3034 # A quantity that exceeds the buffer size of an anonymous pipe's
3035 # write end.
Antoine Pitrou68915d72013-04-24 23:31:38 +02003036 N = support.PIPE_MAX_SIZE
Antoine Pitrou6439c002011-02-25 21:35:47 +00003037 r, w = os.pipe()
3038 fdopen_kwargs["closefd"] = False
3039 # We need a separate thread to read from the pipe and allow the
3040 # write() to finish. This thread is started after the SIGALRM is
3041 # received (forcing a first EINTR in write()).
3042 read_results = []
3043 write_finished = False
3044 def _read():
3045 while not write_finished:
3046 while r in select.select([r], [], [], 1.0)[0]:
3047 s = os.read(r, 1024)
3048 read_results.append(s)
3049 t = threading.Thread(target=_read)
3050 t.daemon = True
3051 def alarm1(sig, frame):
3052 signal.signal(signal.SIGALRM, alarm2)
3053 signal.alarm(1)
3054 def alarm2(sig, frame):
3055 t.start()
3056 signal.signal(signal.SIGALRM, alarm1)
3057 try:
3058 wio = self.io.open(w, **fdopen_kwargs)
3059 signal.alarm(1)
3060 # Expected behaviour:
3061 # - first raw write() is partial (because of the limited pipe buffer
3062 # and the first alarm)
3063 # - second raw write() returns EINTR (because of the second alarm)
3064 # - subsequent write()s are successful (either partial or complete)
3065 self.assertEqual(N, wio.write(item * N))
3066 wio.flush()
3067 write_finished = True
3068 t.join()
3069 self.assertEqual(N, sum(len(x) for x in read_results))
3070 finally:
3071 write_finished = True
3072 os.close(w)
3073 os.close(r)
3074 # This is deliberate. If we didn't close the file descriptor
3075 # before closing wio, wio would try to flush its internal
3076 # buffer, and could block (in case of failure).
3077 try:
3078 wio.close()
3079 except IOError as e:
3080 if e.errno != errno.EBADF:
3081 raise
3082
3083 def test_interrupterd_write_retry_buffered(self):
3084 self.check_interrupted_write_retry(b"x", mode="wb")
3085
3086 def test_interrupterd_write_retry_text(self):
3087 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3088
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003089
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003090class CSignalsTest(SignalsTest):
3091 io = io
3092
3093class PySignalsTest(SignalsTest):
3094 io = pyio
3095
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003096 # Handling reentrancy issues would slow down _pyio even more, so the
3097 # tests are disabled.
3098 test_reentrant_write_buffered = None
3099 test_reentrant_write_text = None
3100
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003101
Christian Heimes1a6387e2008-03-26 12:49:49 +00003102def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00003103 tests = (CIOTest, PyIOTest,
3104 CBufferedReaderTest, PyBufferedReaderTest,
3105 CBufferedWriterTest, PyBufferedWriterTest,
3106 CBufferedRWPairTest, PyBufferedRWPairTest,
3107 CBufferedRandomTest, PyBufferedRandomTest,
3108 StatefulIncrementalDecoderTest,
3109 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3110 CTextIOWrapperTest, PyTextIOWrapperTest,
3111 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003112 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00003113 )
3114
3115 # Put the namespaces of the IO module we are testing and some useful mock
3116 # classes in the __dict__ of each test.
3117 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00003118 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00003119 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3120 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3121 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3122 globs = globals()
3123 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3124 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3125 # Avoid turning open into a bound method.
3126 py_io_ns["open"] = pyio.OpenWrapper
3127 for test in tests:
3128 if test.__name__.startswith("C"):
3129 for name, obj in c_io_ns.items():
3130 setattr(test, name, obj)
3131 elif test.__name__.startswith("Py"):
3132 for name, obj in py_io_ns.items():
3133 setattr(test, name, obj)
3134
3135 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00003136
3137if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00003138 test_main()