blob: af3b90ab80af1616eb26db9dca929d14d82570d8 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +030032import warnings
Antoine Pitrou19690592009-06-12 20:14:08 +000033import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000034import signal
35import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000036from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000037from collections import deque
Antoine Pitrou78e761e2012-10-16 22:57:11 +020038from UserList import UserList
Antoine Pitrou19690592009-06-12 20:14:08 +000039from test import test_support as support
Serhiy Storchaka354d50e2013-02-03 17:10:42 +020040import contextlib
Christian Heimes1a6387e2008-03-26 12:49:49 +000041
42import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000043import io # C implementation of io
44import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000045try:
46 import threading
47except ImportError:
48 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010049try:
50 import fcntl
51except ImportError:
52 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000053
54__metaclass__ = type
55bytes = support.py3k_bytes
56
57def _default_chunk_size():
58 """Get the default TextIOWrapper chunk size"""
59 with io.open(__file__, "r", encoding="latin1") as f:
60 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000061
62
Antoine Pitrou6391b342010-09-14 18:48:19 +000063class MockRawIOWithoutRead:
64 """A RawIO implementation without read(), so as to exercise the default
65 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000066
67 def __init__(self, read_stack=()):
68 self._read_stack = list(read_stack)
69 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000070 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000071 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000072
Christian Heimes1a6387e2008-03-26 12:49:49 +000073 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000074 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000075 return len(b)
76
77 def writable(self):
78 return True
79
80 def fileno(self):
81 return 42
82
83 def readable(self):
84 return True
85
86 def seekable(self):
87 return True
88
89 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000090 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000091
92 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000093 return 0 # same comment as above
94
95 def readinto(self, buf):
96 self._reads += 1
97 max_len = len(buf)
98 try:
99 data = self._read_stack[0]
100 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000101 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +0000102 return 0
103 if data is None:
104 del self._read_stack[0]
105 return None
106 n = len(data)
107 if len(data) <= max_len:
108 del self._read_stack[0]
109 buf[:n] = data
110 return n
111 else:
112 buf[:] = data[:max_len]
113 self._read_stack[0] = data[max_len:]
114 return max_len
115
116 def truncate(self, pos=None):
117 return pos
118
Antoine Pitrou6391b342010-09-14 18:48:19 +0000119class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
120 pass
121
122class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
123 pass
124
125
126class MockRawIO(MockRawIOWithoutRead):
127
128 def read(self, n=None):
129 self._reads += 1
130 try:
131 return self._read_stack.pop(0)
132 except:
133 self._extraneous_reads += 1
134 return b""
135
Antoine Pitrou19690592009-06-12 20:14:08 +0000136class CMockRawIO(MockRawIO, io.RawIOBase):
137 pass
138
139class PyMockRawIO(MockRawIO, pyio.RawIOBase):
140 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000141
142
Antoine Pitrou19690592009-06-12 20:14:08 +0000143class MisbehavedRawIO(MockRawIO):
144 def write(self, b):
145 return MockRawIO.write(self, b) * 2
146
147 def read(self, n=None):
148 return MockRawIO.read(self, n) * 2
149
150 def seek(self, pos, whence):
151 return -123
152
153 def tell(self):
154 return -456
155
156 def readinto(self, buf):
157 MockRawIO.readinto(self, buf)
158 return len(buf) * 5
159
160class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
161 pass
162
163class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
164 pass
165
166
167class CloseFailureIO(MockRawIO):
168 closed = 0
169
170 def close(self):
171 if not self.closed:
172 self.closed = 1
173 raise IOError
174
175class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
176 pass
177
178class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
179 pass
180
181
182class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000183
184 def __init__(self, data):
185 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000187
188 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000189 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190 self.read_history.append(None if res is None else len(res))
191 return res
192
Antoine Pitrou19690592009-06-12 20:14:08 +0000193 def readinto(self, b):
194 res = super(MockFileIO, self).readinto(b)
195 self.read_history.append(res)
196 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000197
Antoine Pitrou19690592009-06-12 20:14:08 +0000198class CMockFileIO(MockFileIO, io.BytesIO):
199 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000200
Antoine Pitrou19690592009-06-12 20:14:08 +0000201class PyMockFileIO(MockFileIO, pyio.BytesIO):
202 pass
203
204
205class MockNonBlockWriterIO:
206
207 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000208 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000209 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000210
Antoine Pitrou19690592009-06-12 20:14:08 +0000211 def pop_written(self):
212 s = b"".join(self._write_stack)
213 self._write_stack[:] = []
214 return s
215
216 def block_on(self, char):
217 """Block when a given char is encountered."""
218 self._blocker_char = char
219
220 def readable(self):
221 return True
222
223 def seekable(self):
224 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000225
226 def writable(self):
227 return True
228
Antoine Pitrou19690592009-06-12 20:14:08 +0000229 def write(self, b):
230 b = bytes(b)
231 n = -1
232 if self._blocker_char:
233 try:
234 n = b.index(self._blocker_char)
235 except ValueError:
236 pass
237 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100238 if n > 0:
239 # write data up to the first blocker
240 self._write_stack.append(b[:n])
241 return n
242 else:
243 # cancel blocker and indicate would block
244 self._blocker_char = None
245 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000246 self._write_stack.append(b)
247 return len(b)
248
249class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
250 BlockingIOError = io.BlockingIOError
251
252class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
253 BlockingIOError = pyio.BlockingIOError
254
Christian Heimes1a6387e2008-03-26 12:49:49 +0000255
256class IOTest(unittest.TestCase):
257
Antoine Pitrou19690592009-06-12 20:14:08 +0000258 def setUp(self):
259 support.unlink(support.TESTFN)
260
Christian Heimes1a6387e2008-03-26 12:49:49 +0000261 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000262 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000263
264 def write_ops(self, f):
265 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000266 f.truncate(0)
267 self.assertEqual(f.tell(), 5)
268 f.seek(0)
269
270 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000271 self.assertEqual(f.seek(0), 0)
272 self.assertEqual(f.write(b"Hello."), 6)
273 self.assertEqual(f.tell(), 6)
274 self.assertEqual(f.seek(-1, 1), 5)
275 self.assertEqual(f.tell(), 5)
276 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
277 self.assertEqual(f.seek(0), 0)
278 self.assertEqual(f.write(b"h"), 1)
279 self.assertEqual(f.seek(-1, 2), 13)
280 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000281
Christian Heimes1a6387e2008-03-26 12:49:49 +0000282 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000283 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000284 self.assertRaises(TypeError, f.seek, 0.0)
285
286 def read_ops(self, f, buffered=False):
287 data = f.read(5)
288 self.assertEqual(data, b"hello")
289 data = bytearray(data)
290 self.assertEqual(f.readinto(data), 5)
291 self.assertEqual(data, b" worl")
292 self.assertEqual(f.readinto(data), 2)
293 self.assertEqual(len(data), 5)
294 self.assertEqual(data[:2], b"d\n")
295 self.assertEqual(f.seek(0), 0)
296 self.assertEqual(f.read(20), b"hello world\n")
297 self.assertEqual(f.read(1), b"")
298 self.assertEqual(f.readinto(bytearray(b"x")), 0)
299 self.assertEqual(f.seek(-6, 2), 6)
300 self.assertEqual(f.read(5), b"world")
301 self.assertEqual(f.read(0), b"")
302 self.assertEqual(f.readinto(bytearray()), 0)
303 self.assertEqual(f.seek(-6, 1), 5)
304 self.assertEqual(f.read(5), b" worl")
305 self.assertEqual(f.tell(), 10)
306 self.assertRaises(TypeError, f.seek, 0.0)
307 if buffered:
308 f.seek(0)
309 self.assertEqual(f.read(), b"hello world\n")
310 f.seek(6)
311 self.assertEqual(f.read(), b"world\n")
312 self.assertEqual(f.read(), b"")
313
314 LARGE = 2**31
315
316 def large_file_ops(self, f):
317 assert f.readable()
318 assert f.writable()
319 self.assertEqual(f.seek(self.LARGE), self.LARGE)
320 self.assertEqual(f.tell(), self.LARGE)
321 self.assertEqual(f.write(b"xxx"), 3)
322 self.assertEqual(f.tell(), self.LARGE + 3)
323 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
324 self.assertEqual(f.truncate(), self.LARGE + 2)
325 self.assertEqual(f.tell(), self.LARGE + 2)
326 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
327 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000328 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000329 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
330 self.assertEqual(f.seek(-1, 2), self.LARGE)
331 self.assertEqual(f.read(2), b"x")
332
Antoine Pitrou19690592009-06-12 20:14:08 +0000333 def test_invalid_operations(self):
334 # Try writing on a file opened in read mode and vice-versa.
335 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000336 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000337 self.assertRaises(IOError, fp.read)
338 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000339 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000340 self.assertRaises(IOError, fp.write, b"blah")
341 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000342 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000343 self.assertRaises(IOError, fp.write, "blah")
344 self.assertRaises(IOError, fp.writelines, ["blah\n"])
345
Christian Heimes1a6387e2008-03-26 12:49:49 +0000346 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000347 with self.open(support.TESTFN, "wb", buffering=0) as f:
348 self.assertEqual(f.readable(), False)
349 self.assertEqual(f.writable(), True)
350 self.assertEqual(f.seekable(), True)
351 self.write_ops(f)
352 with self.open(support.TESTFN, "rb", buffering=0) as f:
353 self.assertEqual(f.readable(), True)
354 self.assertEqual(f.writable(), False)
355 self.assertEqual(f.seekable(), True)
356 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000357
358 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000359 with self.open(support.TESTFN, "wb") as f:
360 self.assertEqual(f.readable(), False)
361 self.assertEqual(f.writable(), True)
362 self.assertEqual(f.seekable(), True)
363 self.write_ops(f)
364 with self.open(support.TESTFN, "rb") as f:
365 self.assertEqual(f.readable(), True)
366 self.assertEqual(f.writable(), False)
367 self.assertEqual(f.seekable(), True)
368 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000369
370 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000371 with self.open(support.TESTFN, "wb") as f:
372 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
373 with self.open(support.TESTFN, "rb") as f:
374 self.assertEqual(f.readline(), b"abc\n")
375 self.assertEqual(f.readline(10), b"def\n")
376 self.assertEqual(f.readline(2), b"xy")
377 self.assertEqual(f.readline(4), b"zzy\n")
378 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000379 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000380 self.assertRaises(TypeError, f.readline, 5.3)
381 with self.open(support.TESTFN, "r") as f:
382 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000383
384 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000385 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000386 self.write_ops(f)
387 data = f.getvalue()
388 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000389 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000390 self.read_ops(f, True)
391
392 def test_large_file_ops(self):
393 # On Windows and Mac OSX this test comsumes large resources; It takes
394 # a long time to build the >2GB file and takes >2GB of disk space
395 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000396 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware1f702212013-12-10 14:09:20 -0600397 support.requires(
398 'largefile',
399 'test requires %s bytes and a long time to run' % self.LARGE)
Antoine Pitrou19690592009-06-12 20:14:08 +0000400 with self.open(support.TESTFN, "w+b", 0) as f:
401 self.large_file_ops(f)
402 with self.open(support.TESTFN, "w+b") as f:
403 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000404
405 def test_with_open(self):
406 for bufsize in (0, 1, 100):
407 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000408 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000409 f.write(b"xxx")
410 self.assertEqual(f.closed, True)
411 f = None
412 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000413 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000414 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000415 except ZeroDivisionError:
416 self.assertEqual(f.closed, True)
417 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000418 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000419
Antoine Pitroue741cc62009-01-21 00:45:36 +0000420 # issue 5008
421 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000422 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000423 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000424 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000425 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000427 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000428 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000429 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000430
Christian Heimes1a6387e2008-03-26 12:49:49 +0000431 def test_destructor(self):
432 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000433 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000434 def __del__(self):
435 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000436 try:
437 f = super(MyFileIO, self).__del__
438 except AttributeError:
439 pass
440 else:
441 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000442 def close(self):
443 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000444 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000445 def flush(self):
446 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000447 super(MyFileIO, self).flush()
448 f = MyFileIO(support.TESTFN, "wb")
449 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000450 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000451 support.gc_collect()
452 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000453 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000454 self.assertEqual(f.read(), b"xxx")
455
456 def _check_base_destructor(self, base):
457 record = []
458 class MyIO(base):
459 def __init__(self):
460 # This exercises the availability of attributes on object
461 # destruction.
462 # (in the C version, close() is called by the tp_dealloc
463 # function, not by __del__)
464 self.on_del = 1
465 self.on_close = 2
466 self.on_flush = 3
467 def __del__(self):
468 record.append(self.on_del)
469 try:
470 f = super(MyIO, self).__del__
471 except AttributeError:
472 pass
473 else:
474 f()
475 def close(self):
476 record.append(self.on_close)
477 super(MyIO, self).close()
478 def flush(self):
479 record.append(self.on_flush)
480 super(MyIO, self).flush()
481 f = MyIO()
482 del f
483 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000484 self.assertEqual(record, [1, 2, 3])
485
Antoine Pitrou19690592009-06-12 20:14:08 +0000486 def test_IOBase_destructor(self):
487 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000488
Antoine Pitrou19690592009-06-12 20:14:08 +0000489 def test_RawIOBase_destructor(self):
490 self._check_base_destructor(self.RawIOBase)
491
492 def test_BufferedIOBase_destructor(self):
493 self._check_base_destructor(self.BufferedIOBase)
494
495 def test_TextIOBase_destructor(self):
496 self._check_base_destructor(self.TextIOBase)
497
498 def test_close_flushes(self):
499 with self.open(support.TESTFN, "wb") as f:
500 f.write(b"xxx")
501 with self.open(support.TESTFN, "rb") as f:
502 self.assertEqual(f.read(), b"xxx")
503
504 def test_array_writes(self):
505 a = array.array(b'i', range(10))
506 n = len(a.tostring())
507 with self.open(support.TESTFN, "wb", 0) as f:
508 self.assertEqual(f.write(a), n)
509 with self.open(support.TESTFN, "wb") as f:
510 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000511
512 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000513 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000514 closefd=False)
515
Antoine Pitrou19690592009-06-12 20:14:08 +0000516 def test_read_closed(self):
517 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000518 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000519 with self.open(support.TESTFN, "r") as f:
520 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000521 self.assertEqual(file.read(), "egg\n")
522 file.seek(0)
523 file.close()
524 self.assertRaises(ValueError, file.read)
525
526 def test_no_closefd_with_filename(self):
527 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000528 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000529
530 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000532 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000533 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000534 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000535 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000536 self.assertEqual(file.buffer.raw.closefd, False)
537
Antoine Pitrou19690592009-06-12 20:14:08 +0000538 def test_garbage_collection(self):
539 # FileIO objects are collected, and collecting them flushes
540 # all data to disk.
541 f = self.FileIO(support.TESTFN, "wb")
542 f.write(b"abcxxx")
543 f.f = f
544 wr = weakref.ref(f)
545 del f
546 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000547 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000548 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000549 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000550
Antoine Pitrou19690592009-06-12 20:14:08 +0000551 def test_unbounded_file(self):
552 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
553 zero = "/dev/zero"
554 if not os.path.exists(zero):
555 self.skipTest("{0} does not exist".format(zero))
556 if sys.maxsize > 0x7FFFFFFF:
557 self.skipTest("test can only run in a 32-bit address space")
558 if support.real_max_memuse < support._2G:
559 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000560 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000561 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000562 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000563 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000564 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000565 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000566
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000567 def test_flush_error_on_close(self):
568 f = self.open(support.TESTFN, "wb", buffering=0)
569 def bad_flush():
570 raise IOError()
571 f.flush = bad_flush
572 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600573 self.assertTrue(f.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000574
575 def test_multi_close(self):
576 f = self.open(support.TESTFN, "wb", buffering=0)
577 f.close()
578 f.close()
579 f.close()
580 self.assertRaises(ValueError, f.flush)
581
Antoine Pitrou6391b342010-09-14 18:48:19 +0000582 def test_RawIOBase_read(self):
583 # Exercise the default RawIOBase.read() implementation (which calls
584 # readinto() internally).
585 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
586 self.assertEqual(rawio.read(2), b"ab")
587 self.assertEqual(rawio.read(2), b"c")
588 self.assertEqual(rawio.read(2), b"d")
589 self.assertEqual(rawio.read(2), None)
590 self.assertEqual(rawio.read(2), b"ef")
591 self.assertEqual(rawio.read(2), b"g")
592 self.assertEqual(rawio.read(2), None)
593 self.assertEqual(rawio.read(2), b"")
594
Hynek Schlawack877effc2012-05-25 09:24:18 +0200595 def test_fileio_closefd(self):
596 # Issue #4841
597 with self.open(__file__, 'rb') as f1, \
598 self.open(__file__, 'rb') as f2:
599 fileio = self.FileIO(f1.fileno(), closefd=False)
600 # .__init__() must not close f1
601 fileio.__init__(f2.fileno(), closefd=False)
602 f1.readline()
603 # .close() must not close f2
604 fileio.close()
605 f2.readline()
606
Serhiy Storchaka05b0a1b2014-06-09 13:32:08 +0300607 def test_nonbuffered_textio(self):
608 with warnings.catch_warnings(record=True) as recorded:
609 with self.assertRaises(ValueError):
610 self.open(support.TESTFN, 'w', buffering=0)
611 support.gc_collect()
612 self.assertEqual(recorded, [])
613
614 def test_invalid_newline(self):
615 with warnings.catch_warnings(record=True) as recorded:
616 with self.assertRaises(ValueError):
617 self.open(support.TESTFN, 'w', newline='invalid')
618 support.gc_collect()
619 self.assertEqual(recorded, [])
620
Hynek Schlawack877effc2012-05-25 09:24:18 +0200621
Antoine Pitrou19690592009-06-12 20:14:08 +0000622class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200623
624 def test_IOBase_finalize(self):
625 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
626 # class which inherits IOBase and an object of this class are caught
627 # in a reference cycle and close() is already in the method cache.
628 class MyIO(self.IOBase):
629 def close(self):
630 pass
631
632 # create an instance to populate the method cache
633 MyIO()
634 obj = MyIO()
635 obj.obj = obj
636 wr = weakref.ref(obj)
637 del MyIO
638 del obj
639 support.gc_collect()
640 self.assertTrue(wr() is None, wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000641
Antoine Pitrou19690592009-06-12 20:14:08 +0000642class PyIOTest(IOTest):
643 test_array_writes = unittest.skip(
644 "len(array.array) returns number of elements rather than bytelength"
645 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000646
647
Antoine Pitrou19690592009-06-12 20:14:08 +0000648class CommonBufferedTests:
649 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
650
651 def test_detach(self):
652 raw = self.MockRawIO()
653 buf = self.tp(raw)
654 self.assertIs(buf.detach(), raw)
655 self.assertRaises(ValueError, buf.detach)
656
657 def test_fileno(self):
658 rawio = self.MockRawIO()
659 bufio = self.tp(rawio)
660
Ezio Melotti2623a372010-11-21 13:34:58 +0000661 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000662
Zachary Ware1f702212013-12-10 14:09:20 -0600663 @unittest.skip('test having existential crisis')
Antoine Pitrou19690592009-06-12 20:14:08 +0000664 def test_no_fileno(self):
665 # XXX will we always have fileno() function? If so, kill
666 # this test. Else, write it.
667 pass
668
669 def test_invalid_args(self):
670 rawio = self.MockRawIO()
671 bufio = self.tp(rawio)
672 # Invalid whence
673 self.assertRaises(ValueError, bufio.seek, 0, -1)
674 self.assertRaises(ValueError, bufio.seek, 0, 3)
675
676 def test_override_destructor(self):
677 tp = self.tp
678 record = []
679 class MyBufferedIO(tp):
680 def __del__(self):
681 record.append(1)
682 try:
683 f = super(MyBufferedIO, self).__del__
684 except AttributeError:
685 pass
686 else:
687 f()
688 def close(self):
689 record.append(2)
690 super(MyBufferedIO, self).close()
691 def flush(self):
692 record.append(3)
693 super(MyBufferedIO, self).flush()
694 rawio = self.MockRawIO()
695 bufio = MyBufferedIO(rawio)
696 writable = bufio.writable()
697 del bufio
698 support.gc_collect()
699 if writable:
700 self.assertEqual(record, [1, 2, 3])
701 else:
702 self.assertEqual(record, [1, 2])
703
704 def test_context_manager(self):
705 # Test usability as a context manager
706 rawio = self.MockRawIO()
707 bufio = self.tp(rawio)
708 def _with():
709 with bufio:
710 pass
711 _with()
712 # bufio should now be closed, and using it a second time should raise
713 # a ValueError.
714 self.assertRaises(ValueError, _with)
715
716 def test_error_through_destructor(self):
717 # Test that the exception state is not modified by a destructor,
718 # even if close() fails.
719 rawio = self.CloseFailureIO()
720 def f():
721 self.tp(rawio).xyzzy
722 with support.captured_output("stderr") as s:
723 self.assertRaises(AttributeError, f)
724 s = s.getvalue().strip()
725 if s:
726 # The destructor *may* have printed an unraisable error, check it
727 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000728 self.assertTrue(s.startswith("Exception IOError: "), s)
729 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000730
731 def test_repr(self):
732 raw = self.MockRawIO()
733 b = self.tp(raw)
734 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
735 self.assertEqual(repr(b), "<%s>" % clsname)
736 raw.name = "dummy"
737 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
738 raw.name = b"dummy"
739 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000740
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000741 def test_flush_error_on_close(self):
742 raw = self.MockRawIO()
743 def bad_flush():
744 raise IOError()
745 raw.flush = bad_flush
746 b = self.tp(raw)
747 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600748 self.assertTrue(b.closed)
749
750 def test_close_error_on_close(self):
751 raw = self.MockRawIO()
752 def bad_flush():
753 raise IOError('flush')
754 def bad_close():
755 raise IOError('close')
756 raw.close = bad_close
757 b = self.tp(raw)
758 b.flush = bad_flush
759 with self.assertRaises(IOError) as err: # exception not swallowed
760 b.close()
761 self.assertEqual(err.exception.args, ('close',))
762 self.assertFalse(b.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000763
764 def test_multi_close(self):
765 raw = self.MockRawIO()
766 b = self.tp(raw)
767 b.close()
768 b.close()
769 b.close()
770 self.assertRaises(ValueError, b.flush)
771
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000772 def test_readonly_attributes(self):
773 raw = self.MockRawIO()
774 buf = self.tp(raw)
775 x = self.MockRawIO()
776 with self.assertRaises((AttributeError, TypeError)):
777 buf.raw = x
778
Christian Heimes1a6387e2008-03-26 12:49:49 +0000779
Antoine Pitroubff5df02012-07-29 19:02:46 +0200780class SizeofTest:
781
782 @support.cpython_only
783 def test_sizeof(self):
784 bufsize1 = 4096
785 bufsize2 = 8192
786 rawio = self.MockRawIO()
787 bufio = self.tp(rawio, buffer_size=bufsize1)
788 size = sys.getsizeof(bufio) - bufsize1
789 rawio = self.MockRawIO()
790 bufio = self.tp(rawio, buffer_size=bufsize2)
791 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
792
793
Antoine Pitrou19690592009-06-12 20:14:08 +0000794class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
795 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000796
Antoine Pitrou19690592009-06-12 20:14:08 +0000797 def test_constructor(self):
798 rawio = self.MockRawIO([b"abc"])
799 bufio = self.tp(rawio)
800 bufio.__init__(rawio)
801 bufio.__init__(rawio, buffer_size=1024)
802 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000803 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000804 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
805 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
806 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
807 rawio = self.MockRawIO([b"abc"])
808 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000809 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000810
Serhiy Storchaka1d19f972014-02-12 10:52:07 +0200811 def test_uninitialized(self):
812 bufio = self.tp.__new__(self.tp)
813 del bufio
814 bufio = self.tp.__new__(self.tp)
815 self.assertRaisesRegexp((ValueError, AttributeError),
816 'uninitialized|has no attribute',
817 bufio.read, 0)
818 bufio.__init__(self.MockRawIO())
819 self.assertEqual(bufio.read(0), b'')
820
Antoine Pitrou19690592009-06-12 20:14:08 +0000821 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000822 for arg in (None, 7):
823 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
824 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000825 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000826 # Invalid args
827 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000828
Antoine Pitrou19690592009-06-12 20:14:08 +0000829 def test_read1(self):
830 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
831 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000832 self.assertEqual(b"a", bufio.read(1))
833 self.assertEqual(b"b", bufio.read1(1))
834 self.assertEqual(rawio._reads, 1)
835 self.assertEqual(b"c", bufio.read1(100))
836 self.assertEqual(rawio._reads, 1)
837 self.assertEqual(b"d", bufio.read1(100))
838 self.assertEqual(rawio._reads, 2)
839 self.assertEqual(b"efg", bufio.read1(100))
840 self.assertEqual(rawio._reads, 3)
841 self.assertEqual(b"", bufio.read1(100))
842 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000843 # Invalid args
844 self.assertRaises(ValueError, bufio.read1, -1)
845
846 def test_readinto(self):
847 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
848 bufio = self.tp(rawio)
849 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000850 self.assertEqual(bufio.readinto(b), 2)
851 self.assertEqual(b, b"ab")
852 self.assertEqual(bufio.readinto(b), 2)
853 self.assertEqual(b, b"cd")
854 self.assertEqual(bufio.readinto(b), 2)
855 self.assertEqual(b, b"ef")
856 self.assertEqual(bufio.readinto(b), 1)
857 self.assertEqual(b, b"gf")
858 self.assertEqual(bufio.readinto(b), 0)
859 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000860
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000861 def test_readlines(self):
862 def bufio():
863 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
864 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000865 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
866 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
867 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000868
Antoine Pitrou19690592009-06-12 20:14:08 +0000869 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000870 data = b"abcdefghi"
871 dlen = len(data)
872
873 tests = [
874 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
875 [ 100, [ 3, 3, 3], [ dlen ] ],
876 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
877 ]
878
879 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000880 rawio = self.MockFileIO(data)
881 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000882 pos = 0
883 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000884 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000885 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000886 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000887 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000888
Antoine Pitrou19690592009-06-12 20:14:08 +0000889 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000890 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000891 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
892 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000893 self.assertEqual(b"abcd", bufio.read(6))
894 self.assertEqual(b"e", bufio.read(1))
895 self.assertEqual(b"fg", bufio.read())
896 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200897 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000898 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000899
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200900 rawio = self.MockRawIO((b"a", None, None))
901 self.assertEqual(b"a", rawio.readall())
902 self.assertIsNone(rawio.readall())
903
Antoine Pitrou19690592009-06-12 20:14:08 +0000904 def test_read_past_eof(self):
905 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
906 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000907
Ezio Melotti2623a372010-11-21 13:34:58 +0000908 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000909
Antoine Pitrou19690592009-06-12 20:14:08 +0000910 def test_read_all(self):
911 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
912 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000913
Ezio Melotti2623a372010-11-21 13:34:58 +0000914 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000915
Victor Stinner6a102812010-04-27 23:55:59 +0000916 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000917 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000918 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000919 try:
920 # Write out many bytes with exactly the same number of 0's,
921 # 1's... 255's. This will help us check that concurrent reading
922 # doesn't duplicate or forget contents.
923 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000924 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000925 random.shuffle(l)
926 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000927 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000928 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000929 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000930 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000931 errors = []
932 results = []
933 def f():
934 try:
935 # Intra-buffer read then buffer-flushing read
936 for n in cycle([1, 19]):
937 s = bufio.read(n)
938 if not s:
939 break
940 # list.append() is atomic
941 results.append(s)
942 except Exception as e:
943 errors.append(e)
944 raise
945 threads = [threading.Thread(target=f) for x in range(20)]
946 for t in threads:
947 t.start()
948 time.sleep(0.02) # yield
949 for t in threads:
950 t.join()
951 self.assertFalse(errors,
952 "the following exceptions were caught: %r" % errors)
953 s = b''.join(results)
954 for i in range(256):
955 c = bytes(bytearray([i]))
956 self.assertEqual(s.count(c), N)
957 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000958 support.unlink(support.TESTFN)
959
960 def test_misbehaved_io(self):
961 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
962 bufio = self.tp(rawio)
963 self.assertRaises(IOError, bufio.seek, 0)
964 self.assertRaises(IOError, bufio.tell)
965
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000966 def test_no_extraneous_read(self):
967 # Issue #9550; when the raw IO object has satisfied the read request,
968 # we should not issue any additional reads, otherwise it may block
969 # (e.g. socket).
970 bufsize = 16
971 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
972 rawio = self.MockRawIO([b"x" * n])
973 bufio = self.tp(rawio, bufsize)
974 self.assertEqual(bufio.read(n), b"x" * n)
975 # Simple case: one raw read is enough to satisfy the request.
976 self.assertEqual(rawio._extraneous_reads, 0,
977 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
978 # A more complex case where two raw reads are needed to satisfy
979 # the request.
980 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
981 bufio = self.tp(rawio, bufsize)
982 self.assertEqual(bufio.read(n), b"x" * n)
983 self.assertEqual(rawio._extraneous_reads, 0,
984 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
985
986
Antoine Pitroubff5df02012-07-29 19:02:46 +0200987class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +0000988 tp = io.BufferedReader
989
990 def test_constructor(self):
991 BufferedReaderTest.test_constructor(self)
992 # The allocation can succeed on 32-bit builds, e.g. with more
993 # than 2GB RAM and a 64-bit kernel.
994 if sys.maxsize > 0x7FFFFFFF:
995 rawio = self.MockRawIO()
996 bufio = self.tp(rawio)
997 self.assertRaises((OverflowError, MemoryError, ValueError),
998 bufio.__init__, rawio, sys.maxsize)
999
1000 def test_initialization(self):
1001 rawio = self.MockRawIO([b"abc"])
1002 bufio = self.tp(rawio)
1003 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1004 self.assertRaises(ValueError, bufio.read)
1005 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1006 self.assertRaises(ValueError, bufio.read)
1007 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1008 self.assertRaises(ValueError, bufio.read)
1009
1010 def test_misbehaved_io_read(self):
1011 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1012 bufio = self.tp(rawio)
1013 # _pyio.BufferedReader seems to implement reading different, so that
1014 # checking this is not so easy.
1015 self.assertRaises(IOError, bufio.read, 10)
1016
1017 def test_garbage_collection(self):
1018 # C BufferedReader objects are collected.
1019 # The Python version has __del__, so it ends into gc.garbage instead
1020 rawio = self.FileIO(support.TESTFN, "w+b")
1021 f = self.tp(rawio)
1022 f.f = f
1023 wr = weakref.ref(f)
1024 del f
1025 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001026 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00001027
R David Murray5b2cf5e2013-02-23 22:11:21 -05001028 def test_args_error(self):
1029 # Issue #17275
1030 with self.assertRaisesRegexp(TypeError, "BufferedReader"):
1031 self.tp(io.BytesIO(), 1024, 1024, 1024)
1032
1033
Antoine Pitrou19690592009-06-12 20:14:08 +00001034class PyBufferedReaderTest(BufferedReaderTest):
1035 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001036
1037
Antoine Pitrou19690592009-06-12 20:14:08 +00001038class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1039 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001040
Antoine Pitrou19690592009-06-12 20:14:08 +00001041 def test_constructor(self):
1042 rawio = self.MockRawIO()
1043 bufio = self.tp(rawio)
1044 bufio.__init__(rawio)
1045 bufio.__init__(rawio, buffer_size=1024)
1046 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +00001047 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001048 bufio.flush()
1049 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1050 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1051 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1052 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +00001053 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001054 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001055 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001056
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001057 def test_uninitialized(self):
1058 bufio = self.tp.__new__(self.tp)
1059 del bufio
1060 bufio = self.tp.__new__(self.tp)
1061 self.assertRaisesRegexp((ValueError, AttributeError),
1062 'uninitialized|has no attribute',
1063 bufio.write, b'')
1064 bufio.__init__(self.MockRawIO())
1065 self.assertEqual(bufio.write(b''), 0)
1066
Antoine Pitrou19690592009-06-12 20:14:08 +00001067 def test_detach_flush(self):
1068 raw = self.MockRawIO()
1069 buf = self.tp(raw)
1070 buf.write(b"howdy!")
1071 self.assertFalse(raw._write_stack)
1072 buf.detach()
1073 self.assertEqual(raw._write_stack, [b"howdy!"])
1074
1075 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001076 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +00001077 writer = self.MockRawIO()
1078 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001079 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001080 self.assertFalse(writer._write_stack)
1081
Antoine Pitrou19690592009-06-12 20:14:08 +00001082 def test_write_overflow(self):
1083 writer = self.MockRawIO()
1084 bufio = self.tp(writer, 8)
1085 contents = b"abcdefghijklmnop"
1086 for n in range(0, len(contents), 3):
1087 bufio.write(contents[n:n+3])
1088 flushed = b"".join(writer._write_stack)
1089 # At least (total - 8) bytes were implicitly flushed, perhaps more
1090 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001091 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001092
Antoine Pitrou19690592009-06-12 20:14:08 +00001093 def check_writes(self, intermediate_func):
1094 # Lots of writes, test the flushed output is as expected.
1095 contents = bytes(range(256)) * 1000
1096 n = 0
1097 writer = self.MockRawIO()
1098 bufio = self.tp(writer, 13)
1099 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1100 def gen_sizes():
1101 for size in count(1):
1102 for i in range(15):
1103 yield size
1104 sizes = gen_sizes()
1105 while n < len(contents):
1106 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001107 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001108 intermediate_func(bufio)
1109 n += size
1110 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001111 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001112 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001113
Antoine Pitrou19690592009-06-12 20:14:08 +00001114 def test_writes(self):
1115 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001116
Antoine Pitrou19690592009-06-12 20:14:08 +00001117 def test_writes_and_flushes(self):
1118 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001119
Antoine Pitrou19690592009-06-12 20:14:08 +00001120 def test_writes_and_seeks(self):
1121 def _seekabs(bufio):
1122 pos = bufio.tell()
1123 bufio.seek(pos + 1, 0)
1124 bufio.seek(pos - 1, 0)
1125 bufio.seek(pos, 0)
1126 self.check_writes(_seekabs)
1127 def _seekrel(bufio):
1128 pos = bufio.seek(0, 1)
1129 bufio.seek(+1, 1)
1130 bufio.seek(-1, 1)
1131 bufio.seek(pos, 0)
1132 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001133
Antoine Pitrou19690592009-06-12 20:14:08 +00001134 def test_writes_and_truncates(self):
1135 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001136
Antoine Pitrou19690592009-06-12 20:14:08 +00001137 def test_write_non_blocking(self):
1138 raw = self.MockNonBlockWriterIO()
1139 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001140
Ezio Melotti2623a372010-11-21 13:34:58 +00001141 self.assertEqual(bufio.write(b"abcd"), 4)
1142 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001143 # 1 byte will be written, the rest will be buffered
1144 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001145 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001146
Antoine Pitrou19690592009-06-12 20:14:08 +00001147 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1148 raw.block_on(b"0")
1149 try:
1150 bufio.write(b"opqrwxyz0123456789")
1151 except self.BlockingIOError as e:
1152 written = e.characters_written
1153 else:
1154 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001155 self.assertEqual(written, 16)
1156 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001157 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001158
Ezio Melotti2623a372010-11-21 13:34:58 +00001159 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001160 s = raw.pop_written()
1161 # Previously buffered bytes were flushed
1162 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001163
Antoine Pitrou19690592009-06-12 20:14:08 +00001164 def test_write_and_rewind(self):
1165 raw = io.BytesIO()
1166 bufio = self.tp(raw, 4)
1167 self.assertEqual(bufio.write(b"abcdef"), 6)
1168 self.assertEqual(bufio.tell(), 6)
1169 bufio.seek(0, 0)
1170 self.assertEqual(bufio.write(b"XY"), 2)
1171 bufio.seek(6, 0)
1172 self.assertEqual(raw.getvalue(), b"XYcdef")
1173 self.assertEqual(bufio.write(b"123456"), 6)
1174 bufio.flush()
1175 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001176
Antoine Pitrou19690592009-06-12 20:14:08 +00001177 def test_flush(self):
1178 writer = self.MockRawIO()
1179 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001180 bufio.write(b"abc")
1181 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001182 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001183
Antoine Pitrou78e761e2012-10-16 22:57:11 +02001184 def test_writelines(self):
1185 l = [b'ab', b'cd', b'ef']
1186 writer = self.MockRawIO()
1187 bufio = self.tp(writer, 8)
1188 bufio.writelines(l)
1189 bufio.flush()
1190 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1191
1192 def test_writelines_userlist(self):
1193 l = UserList([b'ab', b'cd', b'ef'])
1194 writer = self.MockRawIO()
1195 bufio = self.tp(writer, 8)
1196 bufio.writelines(l)
1197 bufio.flush()
1198 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1199
1200 def test_writelines_error(self):
1201 writer = self.MockRawIO()
1202 bufio = self.tp(writer, 8)
1203 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1204 self.assertRaises(TypeError, bufio.writelines, None)
1205
Antoine Pitrou19690592009-06-12 20:14:08 +00001206 def test_destructor(self):
1207 writer = self.MockRawIO()
1208 bufio = self.tp(writer, 8)
1209 bufio.write(b"abc")
1210 del bufio
1211 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001212 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001213
1214 def test_truncate(self):
1215 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001216 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001217 bufio = self.tp(raw, 8)
1218 bufio.write(b"abcdef")
1219 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001220 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001221 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001222 self.assertEqual(f.read(), b"abc")
1223
Victor Stinner6a102812010-04-27 23:55:59 +00001224 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001225 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001226 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001227 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001228 # Write out many bytes from many threads and test they were
1229 # all flushed.
1230 N = 1000
1231 contents = bytes(range(256)) * N
1232 sizes = cycle([1, 19])
1233 n = 0
1234 queue = deque()
1235 while n < len(contents):
1236 size = next(sizes)
1237 queue.append(contents[n:n+size])
1238 n += size
1239 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001240 # We use a real file object because it allows us to
1241 # exercise situations where the GIL is released before
1242 # writing the buffer to the raw streams. This is in addition
1243 # to concurrency issues due to switching threads in the middle
1244 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001245 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001246 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001247 errors = []
1248 def f():
1249 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001250 while True:
1251 try:
1252 s = queue.popleft()
1253 except IndexError:
1254 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001255 bufio.write(s)
1256 except Exception as e:
1257 errors.append(e)
1258 raise
1259 threads = [threading.Thread(target=f) for x in range(20)]
1260 for t in threads:
1261 t.start()
1262 time.sleep(0.02) # yield
1263 for t in threads:
1264 t.join()
1265 self.assertFalse(errors,
1266 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001267 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001268 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001269 s = f.read()
1270 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001271 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001272 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001273 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001274
Antoine Pitrou19690592009-06-12 20:14:08 +00001275 def test_misbehaved_io(self):
1276 rawio = self.MisbehavedRawIO()
1277 bufio = self.tp(rawio, 5)
1278 self.assertRaises(IOError, bufio.seek, 0)
1279 self.assertRaises(IOError, bufio.tell)
1280 self.assertRaises(IOError, bufio.write, b"abcdef")
1281
1282 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001283 with support.check_warnings(("max_buffer_size is deprecated",
1284 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001285 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001286
Benjamin Petersona2d6d712012-12-20 12:24:10 -06001287 def test_write_error_on_close(self):
1288 raw = self.MockRawIO()
1289 def bad_write(b):
1290 raise IOError()
1291 raw.write = bad_write
1292 b = self.tp(raw)
1293 b.write(b'spam')
1294 self.assertRaises(IOError, b.close) # exception not swallowed
1295 self.assertTrue(b.closed)
1296
Antoine Pitrou19690592009-06-12 20:14:08 +00001297
Antoine Pitroubff5df02012-07-29 19:02:46 +02001298class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001299 tp = io.BufferedWriter
1300
1301 def test_constructor(self):
1302 BufferedWriterTest.test_constructor(self)
1303 # The allocation can succeed on 32-bit builds, e.g. with more
1304 # than 2GB RAM and a 64-bit kernel.
1305 if sys.maxsize > 0x7FFFFFFF:
1306 rawio = self.MockRawIO()
1307 bufio = self.tp(rawio)
1308 self.assertRaises((OverflowError, MemoryError, ValueError),
1309 bufio.__init__, rawio, sys.maxsize)
1310
1311 def test_initialization(self):
1312 rawio = self.MockRawIO()
1313 bufio = self.tp(rawio)
1314 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1315 self.assertRaises(ValueError, bufio.write, b"def")
1316 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1317 self.assertRaises(ValueError, bufio.write, b"def")
1318 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1319 self.assertRaises(ValueError, bufio.write, b"def")
1320
1321 def test_garbage_collection(self):
1322 # C BufferedWriter objects are collected, and collecting them flushes
1323 # all data to disk.
1324 # The Python version has __del__, so it ends into gc.garbage instead
1325 rawio = self.FileIO(support.TESTFN, "w+b")
1326 f = self.tp(rawio)
1327 f.write(b"123xxx")
1328 f.x = f
1329 wr = weakref.ref(f)
1330 del f
1331 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001332 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001333 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001334 self.assertEqual(f.read(), b"123xxx")
1335
R David Murray5b2cf5e2013-02-23 22:11:21 -05001336 def test_args_error(self):
1337 # Issue #17275
1338 with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
1339 self.tp(io.BytesIO(), 1024, 1024, 1024)
1340
Antoine Pitrou19690592009-06-12 20:14:08 +00001341
1342class PyBufferedWriterTest(BufferedWriterTest):
1343 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001344
1345class BufferedRWPairTest(unittest.TestCase):
1346
Antoine Pitrou19690592009-06-12 20:14:08 +00001347 def test_constructor(self):
1348 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001349 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001350
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001351 def test_uninitialized(self):
1352 pair = self.tp.__new__(self.tp)
1353 del pair
1354 pair = self.tp.__new__(self.tp)
1355 self.assertRaisesRegexp((ValueError, AttributeError),
1356 'uninitialized|has no attribute',
1357 pair.read, 0)
1358 self.assertRaisesRegexp((ValueError, AttributeError),
1359 'uninitialized|has no attribute',
1360 pair.write, b'')
1361 pair.__init__(self.MockRawIO(), self.MockRawIO())
1362 self.assertEqual(pair.read(0), b'')
1363 self.assertEqual(pair.write(b''), 0)
1364
Antoine Pitrou19690592009-06-12 20:14:08 +00001365 def test_detach(self):
1366 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1367 self.assertRaises(self.UnsupportedOperation, pair.detach)
1368
1369 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001370 with support.check_warnings(("max_buffer_size is deprecated",
1371 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001372 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001373
1374 def test_constructor_with_not_readable(self):
1375 class NotReadable(MockRawIO):
1376 def readable(self):
1377 return False
1378
1379 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1380
1381 def test_constructor_with_not_writeable(self):
1382 class NotWriteable(MockRawIO):
1383 def writable(self):
1384 return False
1385
1386 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1387
1388 def test_read(self):
1389 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1390
1391 self.assertEqual(pair.read(3), b"abc")
1392 self.assertEqual(pair.read(1), b"d")
1393 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001394 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1395 self.assertEqual(pair.read(None), b"abc")
1396
1397 def test_readlines(self):
1398 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1399 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1400 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1401 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001402
1403 def test_read1(self):
1404 # .read1() is delegated to the underlying reader object, so this test
1405 # can be shallow.
1406 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1407
1408 self.assertEqual(pair.read1(3), b"abc")
1409
1410 def test_readinto(self):
1411 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1412
1413 data = bytearray(5)
1414 self.assertEqual(pair.readinto(data), 5)
1415 self.assertEqual(data, b"abcde")
1416
1417 def test_write(self):
1418 w = self.MockRawIO()
1419 pair = self.tp(self.MockRawIO(), w)
1420
1421 pair.write(b"abc")
1422 pair.flush()
1423 pair.write(b"def")
1424 pair.flush()
1425 self.assertEqual(w._write_stack, [b"abc", b"def"])
1426
1427 def test_peek(self):
1428 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1429
1430 self.assertTrue(pair.peek(3).startswith(b"abc"))
1431 self.assertEqual(pair.read(3), b"abc")
1432
1433 def test_readable(self):
1434 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1435 self.assertTrue(pair.readable())
1436
1437 def test_writeable(self):
1438 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1439 self.assertTrue(pair.writable())
1440
1441 def test_seekable(self):
1442 # BufferedRWPairs are never seekable, even if their readers and writers
1443 # are.
1444 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1445 self.assertFalse(pair.seekable())
1446
1447 # .flush() is delegated to the underlying writer object and has been
1448 # tested in the test_write method.
1449
1450 def test_close_and_closed(self):
1451 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1452 self.assertFalse(pair.closed)
1453 pair.close()
1454 self.assertTrue(pair.closed)
1455
1456 def test_isatty(self):
1457 class SelectableIsAtty(MockRawIO):
1458 def __init__(self, isatty):
1459 MockRawIO.__init__(self)
1460 self._isatty = isatty
1461
1462 def isatty(self):
1463 return self._isatty
1464
1465 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1466 self.assertFalse(pair.isatty())
1467
1468 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1469 self.assertTrue(pair.isatty())
1470
1471 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1472 self.assertTrue(pair.isatty())
1473
1474 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1475 self.assertTrue(pair.isatty())
1476
1477class CBufferedRWPairTest(BufferedRWPairTest):
1478 tp = io.BufferedRWPair
1479
1480class PyBufferedRWPairTest(BufferedRWPairTest):
1481 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001482
1483
Antoine Pitrou19690592009-06-12 20:14:08 +00001484class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1485 read_mode = "rb+"
1486 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001487
Antoine Pitrou19690592009-06-12 20:14:08 +00001488 def test_constructor(self):
1489 BufferedReaderTest.test_constructor(self)
1490 BufferedWriterTest.test_constructor(self)
1491
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001492 def test_uninitialized(self):
1493 BufferedReaderTest.test_uninitialized(self)
1494 BufferedWriterTest.test_uninitialized(self)
1495
Antoine Pitrou19690592009-06-12 20:14:08 +00001496 def test_read_and_write(self):
1497 raw = self.MockRawIO((b"asdf", b"ghjk"))
1498 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001499
1500 self.assertEqual(b"as", rw.read(2))
1501 rw.write(b"ddd")
1502 rw.write(b"eee")
1503 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001504 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001505 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001506
Antoine Pitrou19690592009-06-12 20:14:08 +00001507 def test_seek_and_tell(self):
1508 raw = self.BytesIO(b"asdfghjkl")
1509 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001510
Ezio Melotti2623a372010-11-21 13:34:58 +00001511 self.assertEqual(b"as", rw.read(2))
1512 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001513 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001514 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001515
Antoine Pitrou808cec52011-08-20 15:40:58 +02001516 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001517 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001518 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001519 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001520 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001521 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001522 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001523 self.assertEqual(7, rw.tell())
1524 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001525 rw.flush()
1526 self.assertEqual(b"asdf123fl", raw.getvalue())
1527
Christian Heimes1a6387e2008-03-26 12:49:49 +00001528 self.assertRaises(TypeError, rw.seek, 0.0)
1529
Antoine Pitrou19690592009-06-12 20:14:08 +00001530 def check_flush_and_read(self, read_func):
1531 raw = self.BytesIO(b"abcdefghi")
1532 bufio = self.tp(raw)
1533
Ezio Melotti2623a372010-11-21 13:34:58 +00001534 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001535 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001536 self.assertEqual(b"ef", read_func(bufio, 2))
1537 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001538 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001539 self.assertEqual(6, bufio.tell())
1540 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001541 raw.seek(0, 0)
1542 raw.write(b"XYZ")
1543 # flush() resets the read buffer
1544 bufio.flush()
1545 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001546 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001547
1548 def test_flush_and_read(self):
1549 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1550
1551 def test_flush_and_readinto(self):
1552 def _readinto(bufio, n=-1):
1553 b = bytearray(n if n >= 0 else 9999)
1554 n = bufio.readinto(b)
1555 return bytes(b[:n])
1556 self.check_flush_and_read(_readinto)
1557
1558 def test_flush_and_peek(self):
1559 def _peek(bufio, n=-1):
1560 # This relies on the fact that the buffer can contain the whole
1561 # raw stream, otherwise peek() can return less.
1562 b = bufio.peek(n)
1563 if n != -1:
1564 b = b[:n]
1565 bufio.seek(len(b), 1)
1566 return b
1567 self.check_flush_and_read(_peek)
1568
1569 def test_flush_and_write(self):
1570 raw = self.BytesIO(b"abcdefghi")
1571 bufio = self.tp(raw)
1572
1573 bufio.write(b"123")
1574 bufio.flush()
1575 bufio.write(b"45")
1576 bufio.flush()
1577 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001578 self.assertEqual(b"12345fghi", raw.getvalue())
1579 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001580
1581 def test_threads(self):
1582 BufferedReaderTest.test_threads(self)
1583 BufferedWriterTest.test_threads(self)
1584
1585 def test_writes_and_peek(self):
1586 def _peek(bufio):
1587 bufio.peek(1)
1588 self.check_writes(_peek)
1589 def _peek(bufio):
1590 pos = bufio.tell()
1591 bufio.seek(-1, 1)
1592 bufio.peek(1)
1593 bufio.seek(pos, 0)
1594 self.check_writes(_peek)
1595
1596 def test_writes_and_reads(self):
1597 def _read(bufio):
1598 bufio.seek(-1, 1)
1599 bufio.read(1)
1600 self.check_writes(_read)
1601
1602 def test_writes_and_read1s(self):
1603 def _read1(bufio):
1604 bufio.seek(-1, 1)
1605 bufio.read1(1)
1606 self.check_writes(_read1)
1607
1608 def test_writes_and_readintos(self):
1609 def _read(bufio):
1610 bufio.seek(-1, 1)
1611 bufio.readinto(bytearray(1))
1612 self.check_writes(_read)
1613
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001614 def test_write_after_readahead(self):
1615 # Issue #6629: writing after the buffer was filled by readahead should
1616 # first rewind the raw stream.
1617 for overwrite_size in [1, 5]:
1618 raw = self.BytesIO(b"A" * 10)
1619 bufio = self.tp(raw, 4)
1620 # Trigger readahead
1621 self.assertEqual(bufio.read(1), b"A")
1622 self.assertEqual(bufio.tell(), 1)
1623 # Overwriting should rewind the raw stream if it needs so
1624 bufio.write(b"B" * overwrite_size)
1625 self.assertEqual(bufio.tell(), overwrite_size + 1)
1626 # If the write size was smaller than the buffer size, flush() and
1627 # check that rewind happens.
1628 bufio.flush()
1629 self.assertEqual(bufio.tell(), overwrite_size + 1)
1630 s = raw.getvalue()
1631 self.assertEqual(s,
1632 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1633
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001634 def test_write_rewind_write(self):
1635 # Various combinations of reading / writing / seeking backwards / writing again
1636 def mutate(bufio, pos1, pos2):
1637 assert pos2 >= pos1
1638 # Fill the buffer
1639 bufio.seek(pos1)
1640 bufio.read(pos2 - pos1)
1641 bufio.write(b'\x02')
1642 # This writes earlier than the previous write, but still inside
1643 # the buffer.
1644 bufio.seek(pos1)
1645 bufio.write(b'\x01')
1646
1647 b = b"\x80\x81\x82\x83\x84"
1648 for i in range(0, len(b)):
1649 for j in range(i, len(b)):
1650 raw = self.BytesIO(b)
1651 bufio = self.tp(raw, 100)
1652 mutate(bufio, i, j)
1653 bufio.flush()
1654 expected = bytearray(b)
1655 expected[j] = 2
1656 expected[i] = 1
1657 self.assertEqual(raw.getvalue(), expected,
1658 "failed result for i=%d, j=%d" % (i, j))
1659
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001660 def test_truncate_after_read_or_write(self):
1661 raw = self.BytesIO(b"A" * 10)
1662 bufio = self.tp(raw, 100)
1663 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1664 self.assertEqual(bufio.truncate(), 2)
1665 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1666 self.assertEqual(bufio.truncate(), 4)
1667
Antoine Pitrou19690592009-06-12 20:14:08 +00001668 def test_misbehaved_io(self):
1669 BufferedReaderTest.test_misbehaved_io(self)
1670 BufferedWriterTest.test_misbehaved_io(self)
1671
Antoine Pitrou808cec52011-08-20 15:40:58 +02001672 def test_interleaved_read_write(self):
1673 # Test for issue #12213
1674 with self.BytesIO(b'abcdefgh') as raw:
1675 with self.tp(raw, 100) as f:
1676 f.write(b"1")
1677 self.assertEqual(f.read(1), b'b')
1678 f.write(b'2')
1679 self.assertEqual(f.read1(1), b'd')
1680 f.write(b'3')
1681 buf = bytearray(1)
1682 f.readinto(buf)
1683 self.assertEqual(buf, b'f')
1684 f.write(b'4')
1685 self.assertEqual(f.peek(1), b'h')
1686 f.flush()
1687 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1688
1689 with self.BytesIO(b'abc') as raw:
1690 with self.tp(raw, 100) as f:
1691 self.assertEqual(f.read(1), b'a')
1692 f.write(b"2")
1693 self.assertEqual(f.read(1), b'c')
1694 f.flush()
1695 self.assertEqual(raw.getvalue(), b'a2c')
1696
1697 def test_interleaved_readline_write(self):
1698 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1699 with self.tp(raw) as f:
1700 f.write(b'1')
1701 self.assertEqual(f.readline(), b'b\n')
1702 f.write(b'2')
1703 self.assertEqual(f.readline(), b'def\n')
1704 f.write(b'3')
1705 self.assertEqual(f.readline(), b'\n')
1706 f.flush()
1707 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1708
R David Murray5b2cf5e2013-02-23 22:11:21 -05001709
Antoine Pitroubff5df02012-07-29 19:02:46 +02001710class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1711 BufferedRandomTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001712 tp = io.BufferedRandom
1713
1714 def test_constructor(self):
1715 BufferedRandomTest.test_constructor(self)
1716 # The allocation can succeed on 32-bit builds, e.g. with more
1717 # than 2GB RAM and a 64-bit kernel.
1718 if sys.maxsize > 0x7FFFFFFF:
1719 rawio = self.MockRawIO()
1720 bufio = self.tp(rawio)
1721 self.assertRaises((OverflowError, MemoryError, ValueError),
1722 bufio.__init__, rawio, sys.maxsize)
1723
1724 def test_garbage_collection(self):
1725 CBufferedReaderTest.test_garbage_collection(self)
1726 CBufferedWriterTest.test_garbage_collection(self)
1727
R David Murray5b2cf5e2013-02-23 22:11:21 -05001728 def test_args_error(self):
1729 # Issue #17275
1730 with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
1731 self.tp(io.BytesIO(), 1024, 1024, 1024)
1732
1733
Antoine Pitrou19690592009-06-12 20:14:08 +00001734class PyBufferedRandomTest(BufferedRandomTest):
1735 tp = pyio.BufferedRandom
1736
1737
Christian Heimes1a6387e2008-03-26 12:49:49 +00001738# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1739# properties:
1740# - A single output character can correspond to many bytes of input.
1741# - The number of input bytes to complete the character can be
1742# undetermined until the last input byte is received.
1743# - The number of input bytes can vary depending on previous input.
1744# - A single input byte can correspond to many characters of output.
1745# - The number of output characters can be undetermined until the
1746# last input byte is received.
1747# - The number of output characters can vary depending on previous input.
1748
1749class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1750 """
1751 For testing seek/tell behavior with a stateful, buffering decoder.
1752
1753 Input is a sequence of words. Words may be fixed-length (length set
1754 by input) or variable-length (period-terminated). In variable-length
1755 mode, extra periods are ignored. Possible words are:
1756 - 'i' followed by a number sets the input length, I (maximum 99).
1757 When I is set to 0, words are space-terminated.
1758 - 'o' followed by a number sets the output length, O (maximum 99).
1759 - Any other word is converted into a word followed by a period on
1760 the output. The output word consists of the input word truncated
1761 or padded out with hyphens to make its length equal to O. If O
1762 is 0, the word is output verbatim without truncating or padding.
1763 I and O are initially set to 1. When I changes, any buffered input is
1764 re-scanned according to the new I. EOF also terminates the last word.
1765 """
1766
1767 def __init__(self, errors='strict'):
1768 codecs.IncrementalDecoder.__init__(self, errors)
1769 self.reset()
1770
1771 def __repr__(self):
1772 return '<SID %x>' % id(self)
1773
1774 def reset(self):
1775 self.i = 1
1776 self.o = 1
1777 self.buffer = bytearray()
1778
1779 def getstate(self):
1780 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1781 return bytes(self.buffer), i*100 + o
1782
1783 def setstate(self, state):
1784 buffer, io = state
1785 self.buffer = bytearray(buffer)
1786 i, o = divmod(io, 100)
1787 self.i, self.o = i ^ 1, o ^ 1
1788
1789 def decode(self, input, final=False):
1790 output = ''
1791 for b in input:
1792 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001793 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001794 if self.buffer:
1795 output += self.process_word()
1796 else:
1797 self.buffer.append(b)
1798 else: # fixed-length, terminate after self.i bytes
1799 self.buffer.append(b)
1800 if len(self.buffer) == self.i:
1801 output += self.process_word()
1802 if final and self.buffer: # EOF terminates the last word
1803 output += self.process_word()
1804 return output
1805
1806 def process_word(self):
1807 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001808 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001809 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001810 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001811 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1812 else:
1813 output = self.buffer.decode('ascii')
1814 if len(output) < self.o:
1815 output += '-'*self.o # pad out with hyphens
1816 if self.o:
1817 output = output[:self.o] # truncate to output length
1818 output += '.'
1819 self.buffer = bytearray()
1820 return output
1821
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001822 codecEnabled = False
1823
1824 @classmethod
1825 def lookupTestDecoder(cls, name):
1826 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001827 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001828 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001829 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001830 incrementalencoder=None,
1831 streamreader=None, streamwriter=None,
1832 incrementaldecoder=cls)
1833
1834# Register the previous decoder for testing.
1835# Disabled by default, tests will enable it.
1836codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1837
1838
Christian Heimes1a6387e2008-03-26 12:49:49 +00001839class StatefulIncrementalDecoderTest(unittest.TestCase):
1840 """
1841 Make sure the StatefulIncrementalDecoder actually works.
1842 """
1843
1844 test_cases = [
1845 # I=1, O=1 (fixed-length input == fixed-length output)
1846 (b'abcd', False, 'a.b.c.d.'),
1847 # I=0, O=0 (variable-length input, variable-length output)
1848 (b'oiabcd', True, 'abcd.'),
1849 # I=0, O=0 (should ignore extra periods)
1850 (b'oi...abcd...', True, 'abcd.'),
1851 # I=0, O=6 (variable-length input, fixed-length output)
1852 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1853 # I=2, O=6 (fixed-length input < fixed-length output)
1854 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1855 # I=6, O=3 (fixed-length input > fixed-length output)
1856 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1857 # I=0, then 3; O=29, then 15 (with longer output)
1858 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1859 'a----------------------------.' +
1860 'b----------------------------.' +
1861 'cde--------------------------.' +
1862 'abcdefghijabcde.' +
1863 'a.b------------.' +
1864 '.c.------------.' +
1865 'd.e------------.' +
1866 'k--------------.' +
1867 'l--------------.' +
1868 'm--------------.')
1869 ]
1870
Antoine Pitrou19690592009-06-12 20:14:08 +00001871 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001872 # Try a few one-shot test cases.
1873 for input, eof, output in self.test_cases:
1874 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001875 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001876
1877 # Also test an unfinished decode, followed by forcing EOF.
1878 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001879 self.assertEqual(d.decode(b'oiabcd'), '')
1880 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001881
1882class TextIOWrapperTest(unittest.TestCase):
1883
1884 def setUp(self):
1885 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1886 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001887 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001888
1889 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001890 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001891
Antoine Pitrou19690592009-06-12 20:14:08 +00001892 def test_constructor(self):
1893 r = self.BytesIO(b"\xc3\xa9\n\n")
1894 b = self.BufferedReader(r, 1000)
1895 t = self.TextIOWrapper(b)
1896 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001897 self.assertEqual(t.encoding, "latin1")
1898 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001899 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001900 self.assertEqual(t.encoding, "utf8")
1901 self.assertEqual(t.line_buffering, True)
1902 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001903 self.assertRaises(TypeError, t.__init__, b, newline=42)
1904 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1905
1906 def test_detach(self):
1907 r = self.BytesIO()
1908 b = self.BufferedWriter(r)
1909 t = self.TextIOWrapper(b)
1910 self.assertIs(t.detach(), b)
1911
1912 t = self.TextIOWrapper(b, encoding="ascii")
1913 t.write("howdy")
1914 self.assertFalse(r.getvalue())
1915 t.detach()
1916 self.assertEqual(r.getvalue(), b"howdy")
1917 self.assertRaises(ValueError, t.detach)
1918
1919 def test_repr(self):
1920 raw = self.BytesIO("hello".encode("utf-8"))
1921 b = self.BufferedReader(raw)
1922 t = self.TextIOWrapper(b, encoding="utf-8")
1923 modname = self.TextIOWrapper.__module__
1924 self.assertEqual(repr(t),
1925 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1926 raw.name = "dummy"
1927 self.assertEqual(repr(t),
1928 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1929 raw.name = b"dummy"
1930 self.assertEqual(repr(t),
1931 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1932
1933 def test_line_buffering(self):
1934 r = self.BytesIO()
1935 b = self.BufferedWriter(r, 1000)
1936 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1937 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00001938 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001939 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00001940 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001941 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00001942 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001943
Antoine Pitrou19690592009-06-12 20:14:08 +00001944 def test_encoding(self):
1945 # Check the encoding attribute is always set, and valid
1946 b = self.BytesIO()
1947 t = self.TextIOWrapper(b, encoding="utf8")
1948 self.assertEqual(t.encoding, "utf8")
1949 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001950 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001951 codecs.lookup(t.encoding)
1952
1953 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001954 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001955 b = self.BytesIO(b"abc\n\xff\n")
1956 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001957 self.assertRaises(UnicodeError, t.read)
1958 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001959 b = self.BytesIO(b"abc\n\xff\n")
1960 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001961 self.assertRaises(UnicodeError, t.read)
1962 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001963 b = self.BytesIO(b"abc\n\xff\n")
1964 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00001965 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001966 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001967 b = self.BytesIO(b"abc\n\xff\n")
1968 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00001969 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001970
Antoine Pitrou19690592009-06-12 20:14:08 +00001971 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001972 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001973 b = self.BytesIO()
1974 t = self.TextIOWrapper(b, encoding="ascii")
1975 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001976 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001977 b = self.BytesIO()
1978 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1979 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001980 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001981 b = self.BytesIO()
1982 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001983 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001984 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001985 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001986 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001987 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001988 b = self.BytesIO()
1989 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001990 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001991 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001992 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001993 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001994
Antoine Pitrou19690592009-06-12 20:14:08 +00001995 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001996 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1997
1998 tests = [
1999 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2000 [ '', input_lines ],
2001 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2002 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2003 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2004 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002005 encodings = (
2006 'utf-8', 'latin-1',
2007 'utf-16', 'utf-16-le', 'utf-16-be',
2008 'utf-32', 'utf-32-le', 'utf-32-be',
2009 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00002010
2011 # Try a range of buffer sizes to test the case where \r is the last
2012 # character in TextIOWrapper._pending_line.
2013 for encoding in encodings:
2014 # XXX: str.encode() should return bytes
2015 data = bytes(''.join(input_lines).encode(encoding))
2016 for do_reads in (False, True):
2017 for bufsize in range(1, 10):
2018 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002019 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2020 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00002021 encoding=encoding)
2022 if do_reads:
2023 got_lines = []
2024 while True:
2025 c2 = textio.read(2)
2026 if c2 == '':
2027 break
Ezio Melotti2623a372010-11-21 13:34:58 +00002028 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002029 got_lines.append(c2 + textio.readline())
2030 else:
2031 got_lines = list(textio)
2032
2033 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00002034 self.assertEqual(got_line, exp_line)
2035 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002036
Antoine Pitrou19690592009-06-12 20:14:08 +00002037 def test_newlines_input(self):
2038 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002039 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2040 for newline, expected in [
2041 (None, normalized.decode("ascii").splitlines(True)),
2042 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00002043 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2044 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2045 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00002046 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00002047 buf = self.BytesIO(testdata)
2048 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00002049 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002050 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002051 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002052
Antoine Pitrou19690592009-06-12 20:14:08 +00002053 def test_newlines_output(self):
2054 testdict = {
2055 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2056 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2057 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2058 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2059 }
2060 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2061 for newline, expected in tests:
2062 buf = self.BytesIO()
2063 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2064 txt.write("AAA\nB")
2065 txt.write("BB\nCCC\n")
2066 txt.write("X\rY\r\nZ")
2067 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002068 self.assertEqual(buf.closed, False)
2069 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00002070
2071 def test_destructor(self):
2072 l = []
2073 base = self.BytesIO
2074 class MyBytesIO(base):
2075 def close(self):
2076 l.append(self.getvalue())
2077 base.close(self)
2078 b = MyBytesIO()
2079 t = self.TextIOWrapper(b, encoding="ascii")
2080 t.write("abc")
2081 del t
2082 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00002083 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00002084
2085 def test_override_destructor(self):
2086 record = []
2087 class MyTextIO(self.TextIOWrapper):
2088 def __del__(self):
2089 record.append(1)
2090 try:
2091 f = super(MyTextIO, self).__del__
2092 except AttributeError:
2093 pass
2094 else:
2095 f()
2096 def close(self):
2097 record.append(2)
2098 super(MyTextIO, self).close()
2099 def flush(self):
2100 record.append(3)
2101 super(MyTextIO, self).flush()
2102 b = self.BytesIO()
2103 t = MyTextIO(b, encoding="ascii")
2104 del t
2105 support.gc_collect()
2106 self.assertEqual(record, [1, 2, 3])
2107
2108 def test_error_through_destructor(self):
2109 # Test that the exception state is not modified by a destructor,
2110 # even if close() fails.
2111 rawio = self.CloseFailureIO()
2112 def f():
2113 self.TextIOWrapper(rawio).xyzzy
2114 with support.captured_output("stderr") as s:
2115 self.assertRaises(AttributeError, f)
2116 s = s.getvalue().strip()
2117 if s:
2118 # The destructor *may* have printed an unraisable error, check it
2119 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002120 self.assertTrue(s.startswith("Exception IOError: "), s)
2121 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002122
2123 # Systematic tests of the text I/O API
2124
Antoine Pitrou19690592009-06-12 20:14:08 +00002125 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002126 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2127 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00002128 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002129 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002130 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002131 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002132 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002133 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002134 self.assertEqual(f.tell(), 0)
2135 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002136 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002137 self.assertEqual(f.seek(0), 0)
2138 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002139 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002140 self.assertEqual(f.read(2), "ab")
2141 self.assertEqual(f.read(1), "c")
2142 self.assertEqual(f.read(1), "")
2143 self.assertEqual(f.read(), "")
2144 self.assertEqual(f.tell(), cookie)
2145 self.assertEqual(f.seek(0), 0)
2146 self.assertEqual(f.seek(0, 2), cookie)
2147 self.assertEqual(f.write("def"), 3)
2148 self.assertEqual(f.seek(cookie), cookie)
2149 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002150 if enc.startswith("utf"):
2151 self.multi_line_test(f, enc)
2152 f.close()
2153
2154 def multi_line_test(self, f, enc):
2155 f.seek(0)
2156 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002157 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002158 wlines = []
2159 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2160 chars = []
2161 for i in range(size):
2162 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002163 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002164 wlines.append((f.tell(), line))
2165 f.write(line)
2166 f.seek(0)
2167 rlines = []
2168 while True:
2169 pos = f.tell()
2170 line = f.readline()
2171 if not line:
2172 break
2173 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002174 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002175
Antoine Pitrou19690592009-06-12 20:14:08 +00002176 def test_telling(self):
2177 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002178 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002179 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002180 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002181 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002182 p2 = f.tell()
2183 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002184 self.assertEqual(f.tell(), p0)
2185 self.assertEqual(f.readline(), "\xff\n")
2186 self.assertEqual(f.tell(), p1)
2187 self.assertEqual(f.readline(), "\xff\n")
2188 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002189 f.seek(0)
2190 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002191 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002192 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002193 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002194 f.close()
2195
Antoine Pitrou19690592009-06-12 20:14:08 +00002196 def test_seeking(self):
2197 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002198 prefix_size = chunk_size - 2
2199 u_prefix = "a" * prefix_size
2200 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002201 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002202 u_suffix = "\u8888\n"
2203 suffix = bytes(u_suffix.encode("utf-8"))
2204 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002205 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002206 f.write(line*2)
2207 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002208 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002209 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002210 self.assertEqual(s, prefix.decode("ascii"))
2211 self.assertEqual(f.tell(), prefix_size)
2212 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002213
Antoine Pitrou19690592009-06-12 20:14:08 +00002214 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002215 # Regression test for a specific bug
2216 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002217 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002218 f.write(data)
2219 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002220 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002221 f._CHUNK_SIZE # Just test that it exists
2222 f._CHUNK_SIZE = 2
2223 f.readline()
2224 f.tell()
2225
Antoine Pitrou19690592009-06-12 20:14:08 +00002226 def test_seek_and_tell(self):
2227 #Test seek/tell using the StatefulIncrementalDecoder.
2228 # Make test faster by doing smaller seeks
2229 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002230
Antoine Pitrou19690592009-06-12 20:14:08 +00002231 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002232 """Tell/seek to various points within a data stream and ensure
2233 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002234 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002235 f.write(data)
2236 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002237 f = self.open(support.TESTFN, encoding='test_decoder')
2238 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002239 decoded = f.read()
2240 f.close()
2241
2242 for i in range(min_pos, len(decoded) + 1): # seek positions
2243 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002244 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002245 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002246 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002247 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002248 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002249 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002250 f.close()
2251
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002252 # Enable the test decoder.
2253 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002254
2255 # Run the tests.
2256 try:
2257 # Try each test case.
2258 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002259 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002260
2261 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002262 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2263 offset = CHUNK_SIZE - len(input)//2
2264 prefix = b'.'*offset
2265 # Don't bother seeking into the prefix (takes too long).
2266 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002267 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002268
2269 # Ensure our test decoder won't interfere with subsequent tests.
2270 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002271 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002272
Antoine Pitrou19690592009-06-12 20:14:08 +00002273 def test_encoded_writes(self):
2274 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002275 tests = ("utf-16",
2276 "utf-16-le",
2277 "utf-16-be",
2278 "utf-32",
2279 "utf-32-le",
2280 "utf-32-be")
2281 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002282 buf = self.BytesIO()
2283 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002284 # Check if the BOM is written only once (see issue1753).
2285 f.write(data)
2286 f.write(data)
2287 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002288 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002289 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002290 self.assertEqual(f.read(), data * 2)
2291 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002292
Antoine Pitrou19690592009-06-12 20:14:08 +00002293 def test_unreadable(self):
2294 class UnReadable(self.BytesIO):
2295 def readable(self):
2296 return False
2297 txt = self.TextIOWrapper(UnReadable())
2298 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002299
Antoine Pitrou19690592009-06-12 20:14:08 +00002300 def test_read_one_by_one(self):
2301 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002302 reads = ""
2303 while True:
2304 c = txt.read(1)
2305 if not c:
2306 break
2307 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002308 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002309
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002310 def test_readlines(self):
2311 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2312 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2313 txt.seek(0)
2314 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2315 txt.seek(0)
2316 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2317
Christian Heimes1a6387e2008-03-26 12:49:49 +00002318 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002319 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002320 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002321 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002322 reads = ""
2323 while True:
2324 c = txt.read(128)
2325 if not c:
2326 break
2327 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002328 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002329
Antoine Pitroueadca1d2012-10-16 23:02:27 +02002330 def test_writelines(self):
2331 l = ['ab', 'cd', 'ef']
2332 buf = self.BytesIO()
2333 txt = self.TextIOWrapper(buf)
2334 txt.writelines(l)
2335 txt.flush()
2336 self.assertEqual(buf.getvalue(), b'abcdef')
2337
2338 def test_writelines_userlist(self):
2339 l = UserList(['ab', 'cd', 'ef'])
2340 buf = self.BytesIO()
2341 txt = self.TextIOWrapper(buf)
2342 txt.writelines(l)
2343 txt.flush()
2344 self.assertEqual(buf.getvalue(), b'abcdef')
2345
2346 def test_writelines_error(self):
2347 txt = self.TextIOWrapper(self.BytesIO())
2348 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2349 self.assertRaises(TypeError, txt.writelines, None)
2350 self.assertRaises(TypeError, txt.writelines, b'abc')
2351
Christian Heimes1a6387e2008-03-26 12:49:49 +00002352 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002353 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002354
2355 # read one char at a time
2356 reads = ""
2357 while True:
2358 c = txt.read(1)
2359 if not c:
2360 break
2361 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002362 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002363
2364 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002365 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002366 txt._CHUNK_SIZE = 4
2367
2368 reads = ""
2369 while True:
2370 c = txt.read(4)
2371 if not c:
2372 break
2373 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002374 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002375
2376 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002377 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002378 txt._CHUNK_SIZE = 4
2379
2380 reads = txt.read(4)
2381 reads += txt.read(4)
2382 reads += txt.readline()
2383 reads += txt.readline()
2384 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002385 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002386
2387 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002388 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002389 txt._CHUNK_SIZE = 4
2390
2391 reads = txt.read(4)
2392 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002393 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002394
2395 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002396 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002397 txt._CHUNK_SIZE = 4
2398
2399 reads = txt.read(4)
2400 pos = txt.tell()
2401 txt.seek(0)
2402 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002403 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002404
2405 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002406 buffer = self.BytesIO(self.testdata)
2407 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002408
2409 self.assertEqual(buffer.seekable(), txt.seekable())
2410
Antoine Pitrou19690592009-06-12 20:14:08 +00002411 def test_append_bom(self):
2412 # The BOM is not written again when appending to a non-empty file
2413 filename = support.TESTFN
2414 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2415 with self.open(filename, 'w', encoding=charset) as f:
2416 f.write('aaa')
2417 pos = f.tell()
2418 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002419 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002420
2421 with self.open(filename, 'a', encoding=charset) as f:
2422 f.write('xxx')
2423 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002424 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002425
Antoine Pitrou19690592009-06-12 20:14:08 +00002426 def test_seek_bom(self):
2427 # Same test, but when seeking manually
2428 filename = support.TESTFN
2429 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2430 with self.open(filename, 'w', encoding=charset) as f:
2431 f.write('aaa')
2432 pos = f.tell()
2433 with self.open(filename, 'r+', encoding=charset) as f:
2434 f.seek(pos)
2435 f.write('zzz')
2436 f.seek(0)
2437 f.write('bbb')
2438 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002439 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002440
2441 def test_errors_property(self):
2442 with self.open(support.TESTFN, "w") as f:
2443 self.assertEqual(f.errors, "strict")
2444 with self.open(support.TESTFN, "w", errors="replace") as f:
2445 self.assertEqual(f.errors, "replace")
2446
Victor Stinner6a102812010-04-27 23:55:59 +00002447 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002448 def test_threads_write(self):
2449 # Issue6750: concurrent writes could duplicate data
2450 event = threading.Event()
2451 with self.open(support.TESTFN, "w", buffering=1) as f:
2452 def run(n):
2453 text = "Thread%03d\n" % n
2454 event.wait()
2455 f.write(text)
2456 threads = [threading.Thread(target=lambda n=x: run(n))
2457 for x in range(20)]
2458 for t in threads:
2459 t.start()
2460 time.sleep(0.02)
2461 event.set()
2462 for t in threads:
2463 t.join()
2464 with self.open(support.TESTFN) as f:
2465 content = f.read()
2466 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002467 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002468
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002469 def test_flush_error_on_close(self):
2470 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2471 def bad_flush():
2472 raise IOError()
2473 txt.flush = bad_flush
2474 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -06002475 self.assertTrue(txt.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002476
2477 def test_multi_close(self):
2478 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2479 txt.close()
2480 txt.close()
2481 txt.close()
2482 self.assertRaises(ValueError, txt.flush)
2483
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002484 def test_readonly_attributes(self):
2485 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2486 buf = self.BytesIO(self.testdata)
2487 with self.assertRaises((AttributeError, TypeError)):
2488 txt.buffer = buf
2489
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002490 def test_read_nonbytes(self):
2491 # Issue #17106
2492 # Crash when underlying read() returns non-bytes
2493 class NonbytesStream(self.StringIO):
2494 read1 = self.StringIO.read
2495 class NonbytesStream(self.StringIO):
2496 read1 = self.StringIO.read
2497 t = self.TextIOWrapper(NonbytesStream('a'))
2498 with self.maybeRaises(TypeError):
2499 t.read(1)
2500 t = self.TextIOWrapper(NonbytesStream('a'))
2501 with self.maybeRaises(TypeError):
2502 t.readline()
2503 t = self.TextIOWrapper(NonbytesStream('a'))
2504 self.assertEqual(t.read(), u'a')
2505
2506 def test_illegal_decoder(self):
2507 # Issue #17106
2508 # Crash when decoder returns non-string
2509 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2510 encoding='quopri_codec')
2511 with self.maybeRaises(TypeError):
2512 t.read(1)
2513 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2514 encoding='quopri_codec')
2515 with self.maybeRaises(TypeError):
2516 t.readline()
2517 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2518 encoding='quopri_codec')
2519 with self.maybeRaises(TypeError):
2520 t.read()
2521
2522
Antoine Pitrou19690592009-06-12 20:14:08 +00002523class CTextIOWrapperTest(TextIOWrapperTest):
2524
2525 def test_initialization(self):
2526 r = self.BytesIO(b"\xc3\xa9\n\n")
2527 b = self.BufferedReader(r, 1000)
2528 t = self.TextIOWrapper(b)
2529 self.assertRaises(TypeError, t.__init__, b, newline=42)
2530 self.assertRaises(ValueError, t.read)
2531 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2532 self.assertRaises(ValueError, t.read)
2533
2534 def test_garbage_collection(self):
2535 # C TextIOWrapper objects are collected, and collecting them flushes
2536 # all data to disk.
2537 # The Python version has __del__, so it ends in gc.garbage instead.
2538 rawio = io.FileIO(support.TESTFN, "wb")
2539 b = self.BufferedWriter(rawio)
2540 t = self.TextIOWrapper(b, encoding="ascii")
2541 t.write("456def")
2542 t.x = t
2543 wr = weakref.ref(t)
2544 del t
2545 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002546 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002547 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002548 self.assertEqual(f.read(), b"456def")
2549
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002550 def test_rwpair_cleared_before_textio(self):
2551 # Issue 13070: TextIOWrapper's finalization would crash when called
2552 # after the reference to the underlying BufferedRWPair's writer got
2553 # cleared by the GC.
2554 for i in range(1000):
2555 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2556 t1 = self.TextIOWrapper(b1, encoding="ascii")
2557 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2558 t2 = self.TextIOWrapper(b2, encoding="ascii")
2559 # circular references
2560 t1.buddy = t2
2561 t2.buddy = t1
2562 support.gc_collect()
2563
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002564 maybeRaises = unittest.TestCase.assertRaises
2565
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002566
Antoine Pitrou19690592009-06-12 20:14:08 +00002567class PyTextIOWrapperTest(TextIOWrapperTest):
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002568 @contextlib.contextmanager
2569 def maybeRaises(self, *args, **kwds):
2570 yield
Antoine Pitrou19690592009-06-12 20:14:08 +00002571
2572
2573class IncrementalNewlineDecoderTest(unittest.TestCase):
2574
2575 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002576 # UTF-8 specific tests for a newline decoder
2577 def _check_decode(b, s, **kwargs):
2578 # We exercise getstate() / setstate() as well as decode()
2579 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002580 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002581 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002582 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002583
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002584 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002585
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002586 _check_decode(b'\xe8', "")
2587 _check_decode(b'\xa2', "")
2588 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002589
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002590 _check_decode(b'\xe8', "")
2591 _check_decode(b'\xa2', "")
2592 _check_decode(b'\x88', "\u8888")
2593
2594 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002595 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2596
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002597 decoder.reset()
2598 _check_decode(b'\n', "\n")
2599 _check_decode(b'\r', "")
2600 _check_decode(b'', "\n", final=True)
2601 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002602
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002603 _check_decode(b'\r', "")
2604 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002605
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002606 _check_decode(b'\r\r\n', "\n\n")
2607 _check_decode(b'\r', "")
2608 _check_decode(b'\r', "\n")
2609 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002610
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002611 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2612 _check_decode(b'\xe8\xa2\x88', "\u8888")
2613 _check_decode(b'\n', "\n")
2614 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2615 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002616
Antoine Pitrou19690592009-06-12 20:14:08 +00002617 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002618 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002619 if encoding is not None:
2620 encoder = codecs.getincrementalencoder(encoding)()
2621 def _decode_bytewise(s):
2622 # Decode one byte at a time
2623 for b in encoder.encode(s):
2624 result.append(decoder.decode(b))
2625 else:
2626 encoder = None
2627 def _decode_bytewise(s):
2628 # Decode one char at a time
2629 for c in s:
2630 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002631 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002632 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002633 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002634 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002635 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002636 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002637 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002638 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002639 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002640 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002641 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002642 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002643 input = "abc"
2644 if encoder is not None:
2645 encoder.reset()
2646 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002647 self.assertEqual(decoder.decode(input), "abc")
2648 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002649
2650 def test_newline_decoder(self):
2651 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002652 # None meaning the IncrementalNewlineDecoder takes unicode input
2653 # rather than bytes input
2654 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002655 'utf-16', 'utf-16-le', 'utf-16-be',
2656 'utf-32', 'utf-32-le', 'utf-32-be',
2657 )
2658 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002659 decoder = enc and codecs.getincrementaldecoder(enc)()
2660 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2661 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002662 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002663 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2664 self.check_newline_decoding_utf8(decoder)
2665
2666 def test_newline_bytes(self):
2667 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2668 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002669 self.assertEqual(dec.newlines, None)
2670 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2671 self.assertEqual(dec.newlines, None)
2672 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2673 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002674 dec = self.IncrementalNewlineDecoder(None, translate=False)
2675 _check(dec)
2676 dec = self.IncrementalNewlineDecoder(None, translate=True)
2677 _check(dec)
2678
2679class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2680 pass
2681
2682class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2683 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002684
Christian Heimes1a6387e2008-03-26 12:49:49 +00002685
2686# XXX Tests for open()
2687
2688class MiscIOTest(unittest.TestCase):
2689
Benjamin Petersonad100c32008-11-20 22:06:22 +00002690 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002691 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002692
Antoine Pitrou19690592009-06-12 20:14:08 +00002693 def test___all__(self):
2694 for name in self.io.__all__:
2695 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002696 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002697 if name == "open":
2698 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002699 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002700 self.assertTrue(issubclass(obj, Exception), name)
2701 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002702 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002703
Benjamin Petersonad100c32008-11-20 22:06:22 +00002704 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002705 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002706 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002707 f.close()
2708
Antoine Pitrou19690592009-06-12 20:14:08 +00002709 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002710 self.assertEqual(f.name, support.TESTFN)
2711 self.assertEqual(f.buffer.name, support.TESTFN)
2712 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2713 self.assertEqual(f.mode, "U")
2714 self.assertEqual(f.buffer.mode, "rb")
2715 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002716 f.close()
2717
Antoine Pitrou19690592009-06-12 20:14:08 +00002718 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002719 self.assertEqual(f.mode, "w+")
2720 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2721 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002722
Antoine Pitrou19690592009-06-12 20:14:08 +00002723 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002724 self.assertEqual(g.mode, "wb")
2725 self.assertEqual(g.raw.mode, "wb")
2726 self.assertEqual(g.name, f.fileno())
2727 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002728 f.close()
2729 g.close()
2730
Antoine Pitrou19690592009-06-12 20:14:08 +00002731 def test_io_after_close(self):
2732 for kwargs in [
2733 {"mode": "w"},
2734 {"mode": "wb"},
2735 {"mode": "w", "buffering": 1},
2736 {"mode": "w", "buffering": 2},
2737 {"mode": "wb", "buffering": 0},
2738 {"mode": "r"},
2739 {"mode": "rb"},
2740 {"mode": "r", "buffering": 1},
2741 {"mode": "r", "buffering": 2},
2742 {"mode": "rb", "buffering": 0},
2743 {"mode": "w+"},
2744 {"mode": "w+b"},
2745 {"mode": "w+", "buffering": 1},
2746 {"mode": "w+", "buffering": 2},
2747 {"mode": "w+b", "buffering": 0},
2748 ]:
2749 f = self.open(support.TESTFN, **kwargs)
2750 f.close()
2751 self.assertRaises(ValueError, f.flush)
2752 self.assertRaises(ValueError, f.fileno)
2753 self.assertRaises(ValueError, f.isatty)
2754 self.assertRaises(ValueError, f.__iter__)
2755 if hasattr(f, "peek"):
2756 self.assertRaises(ValueError, f.peek, 1)
2757 self.assertRaises(ValueError, f.read)
2758 if hasattr(f, "read1"):
2759 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002760 if hasattr(f, "readall"):
2761 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002762 if hasattr(f, "readinto"):
2763 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2764 self.assertRaises(ValueError, f.readline)
2765 self.assertRaises(ValueError, f.readlines)
2766 self.assertRaises(ValueError, f.seek, 0)
2767 self.assertRaises(ValueError, f.tell)
2768 self.assertRaises(ValueError, f.truncate)
2769 self.assertRaises(ValueError, f.write,
2770 b"" if "b" in kwargs['mode'] else "")
2771 self.assertRaises(ValueError, f.writelines, [])
2772 self.assertRaises(ValueError, next, f)
2773
2774 def test_blockingioerror(self):
2775 # Various BlockingIOError issues
2776 self.assertRaises(TypeError, self.BlockingIOError)
2777 self.assertRaises(TypeError, self.BlockingIOError, 1)
2778 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2779 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2780 b = self.BlockingIOError(1, "")
2781 self.assertEqual(b.characters_written, 0)
2782 class C(unicode):
2783 pass
2784 c = C("")
2785 b = self.BlockingIOError(1, c)
2786 c.b = b
2787 b.c = c
2788 wr = weakref.ref(c)
2789 del c, b
2790 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002791 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002792
2793 def test_abcs(self):
2794 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002795 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2796 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2797 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2798 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002799
2800 def _check_abc_inheritance(self, abcmodule):
2801 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002802 self.assertIsInstance(f, abcmodule.IOBase)
2803 self.assertIsInstance(f, abcmodule.RawIOBase)
2804 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2805 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002806 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002807 self.assertIsInstance(f, abcmodule.IOBase)
2808 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2809 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2810 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002811 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002812 self.assertIsInstance(f, abcmodule.IOBase)
2813 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2814 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2815 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002816
2817 def test_abc_inheritance(self):
2818 # Test implementations inherit from their respective ABCs
2819 self._check_abc_inheritance(self)
2820
2821 def test_abc_inheritance_official(self):
2822 # Test implementations inherit from the official ABCs of the
2823 # baseline "io" module.
2824 self._check_abc_inheritance(io)
2825
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01002826 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2827 def test_nonblock_pipe_write_bigbuf(self):
2828 self._test_nonblock_pipe_write(16*1024)
2829
2830 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2831 def test_nonblock_pipe_write_smallbuf(self):
2832 self._test_nonblock_pipe_write(1024)
2833
2834 def _set_non_blocking(self, fd):
2835 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2836 self.assertNotEqual(flags, -1)
2837 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2838 self.assertEqual(res, 0)
2839
2840 def _test_nonblock_pipe_write(self, bufsize):
2841 sent = []
2842 received = []
2843 r, w = os.pipe()
2844 self._set_non_blocking(r)
2845 self._set_non_blocking(w)
2846
2847 # To exercise all code paths in the C implementation we need
2848 # to play with buffer sizes. For instance, if we choose a
2849 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2850 # then we will never get a partial write of the buffer.
2851 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2852 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2853
2854 with rf, wf:
2855 for N in 9999, 73, 7574:
2856 try:
2857 i = 0
2858 while True:
2859 msg = bytes([i % 26 + 97]) * N
2860 sent.append(msg)
2861 wf.write(msg)
2862 i += 1
2863
2864 except self.BlockingIOError as e:
2865 self.assertEqual(e.args[0], errno.EAGAIN)
2866 sent[-1] = sent[-1][:e.characters_written]
2867 received.append(rf.read())
2868 msg = b'BLOCKED'
2869 wf.write(msg)
2870 sent.append(msg)
2871
2872 while True:
2873 try:
2874 wf.flush()
2875 break
2876 except self.BlockingIOError as e:
2877 self.assertEqual(e.args[0], errno.EAGAIN)
2878 self.assertEqual(e.characters_written, 0)
2879 received.append(rf.read())
2880
2881 received += iter(rf.read, None)
2882
2883 sent, received = b''.join(sent), b''.join(received)
2884 self.assertTrue(sent == received)
2885 self.assertTrue(wf.closed)
2886 self.assertTrue(rf.closed)
2887
Antoine Pitrou19690592009-06-12 20:14:08 +00002888class CMiscIOTest(MiscIOTest):
2889 io = io
2890
2891class PyMiscIOTest(MiscIOTest):
2892 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002893
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002894
2895@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2896class SignalsTest(unittest.TestCase):
2897
2898 def setUp(self):
2899 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2900
2901 def tearDown(self):
2902 signal.signal(signal.SIGALRM, self.oldalrm)
2903
2904 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00002905 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002906
2907 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02002908 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2909 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002910 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2911 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00002912 invokes the signal handler, and bubbles up the exception raised
2913 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002914 read_results = []
2915 def _read():
2916 s = os.read(r, 1)
2917 read_results.append(s)
2918 t = threading.Thread(target=_read)
2919 t.daemon = True
2920 r, w = os.pipe()
2921 try:
2922 wio = self.io.open(w, **fdopen_kwargs)
2923 t.start()
2924 signal.alarm(1)
2925 # Fill the pipe enough that the write will be blocking.
2926 # It will be interrupted by the timer armed above. Since the
2927 # other thread has read one byte, the low-level write will
2928 # return with a successful (partial) result rather than an EINTR.
2929 # The buffered IO layer must check for pending signal
2930 # handlers, which in this case will invoke alarm_interrupt().
2931 self.assertRaises(ZeroDivisionError,
Antoine Pitrou68915d72013-04-24 23:31:38 +02002932 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002933 t.join()
2934 # We got one byte, get another one and check that it isn't a
2935 # repeat of the first one.
2936 read_results.append(os.read(r, 1))
2937 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2938 finally:
2939 os.close(w)
2940 os.close(r)
2941 # This is deliberate. If we didn't close the file descriptor
2942 # before closing wio, wio would try to flush its internal
2943 # buffer, and block again.
2944 try:
2945 wio.close()
2946 except IOError as e:
2947 if e.errno != errno.EBADF:
2948 raise
2949
2950 def test_interrupted_write_unbuffered(self):
2951 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2952
2953 def test_interrupted_write_buffered(self):
2954 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2955
2956 def test_interrupted_write_text(self):
2957 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2958
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002959 def check_reentrant_write(self, data, **fdopen_kwargs):
2960 def on_alarm(*args):
2961 # Will be called reentrantly from the same thread
2962 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02002963 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002964 signal.signal(signal.SIGALRM, on_alarm)
2965 r, w = os.pipe()
2966 wio = self.io.open(w, **fdopen_kwargs)
2967 try:
2968 signal.alarm(1)
2969 # Either the reentrant call to wio.write() fails with RuntimeError,
2970 # or the signal handler raises ZeroDivisionError.
2971 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2972 while 1:
2973 for i in range(100):
2974 wio.write(data)
2975 wio.flush()
2976 # Make sure the buffer doesn't fill up and block further writes
2977 os.read(r, len(data) * 100)
2978 exc = cm.exception
2979 if isinstance(exc, RuntimeError):
2980 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2981 finally:
2982 wio.close()
2983 os.close(r)
2984
2985 def test_reentrant_write_buffered(self):
2986 self.check_reentrant_write(b"xy", mode="wb")
2987
2988 def test_reentrant_write_text(self):
2989 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2990
Antoine Pitrou6439c002011-02-25 21:35:47 +00002991 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2992 """Check that a buffered read, when it gets interrupted (either
2993 returning a partial result or EINTR), properly invokes the signal
2994 handler and retries if the latter returned successfully."""
2995 r, w = os.pipe()
2996 fdopen_kwargs["closefd"] = False
2997 def alarm_handler(sig, frame):
2998 os.write(w, b"bar")
2999 signal.signal(signal.SIGALRM, alarm_handler)
3000 try:
3001 rio = self.io.open(r, **fdopen_kwargs)
3002 os.write(w, b"foo")
3003 signal.alarm(1)
3004 # Expected behaviour:
3005 # - first raw read() returns partial b"foo"
3006 # - second raw read() returns EINTR
3007 # - third raw read() returns b"bar"
3008 self.assertEqual(decode(rio.read(6)), "foobar")
3009 finally:
3010 rio.close()
3011 os.close(w)
3012 os.close(r)
3013
3014 def test_interrupterd_read_retry_buffered(self):
3015 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3016 mode="rb")
3017
3018 def test_interrupterd_read_retry_text(self):
3019 self.check_interrupted_read_retry(lambda x: x,
3020 mode="r")
3021
3022 @unittest.skipUnless(threading, 'Threading required for this test.')
3023 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3024 """Check that a buffered write, when it gets interrupted (either
3025 returning a partial result or EINTR), properly invokes the signal
3026 handler and retries if the latter returned successfully."""
3027 select = support.import_module("select")
3028 # A quantity that exceeds the buffer size of an anonymous pipe's
3029 # write end.
Antoine Pitrou68915d72013-04-24 23:31:38 +02003030 N = support.PIPE_MAX_SIZE
Antoine Pitrou6439c002011-02-25 21:35:47 +00003031 r, w = os.pipe()
3032 fdopen_kwargs["closefd"] = False
3033 # We need a separate thread to read from the pipe and allow the
3034 # write() to finish. This thread is started after the SIGALRM is
3035 # received (forcing a first EINTR in write()).
3036 read_results = []
3037 write_finished = False
3038 def _read():
3039 while not write_finished:
3040 while r in select.select([r], [], [], 1.0)[0]:
3041 s = os.read(r, 1024)
3042 read_results.append(s)
3043 t = threading.Thread(target=_read)
3044 t.daemon = True
3045 def alarm1(sig, frame):
3046 signal.signal(signal.SIGALRM, alarm2)
3047 signal.alarm(1)
3048 def alarm2(sig, frame):
3049 t.start()
3050 signal.signal(signal.SIGALRM, alarm1)
3051 try:
3052 wio = self.io.open(w, **fdopen_kwargs)
3053 signal.alarm(1)
3054 # Expected behaviour:
3055 # - first raw write() is partial (because of the limited pipe buffer
3056 # and the first alarm)
3057 # - second raw write() returns EINTR (because of the second alarm)
3058 # - subsequent write()s are successful (either partial or complete)
3059 self.assertEqual(N, wio.write(item * N))
3060 wio.flush()
3061 write_finished = True
3062 t.join()
3063 self.assertEqual(N, sum(len(x) for x in read_results))
3064 finally:
3065 write_finished = True
3066 os.close(w)
3067 os.close(r)
3068 # This is deliberate. If we didn't close the file descriptor
3069 # before closing wio, wio would try to flush its internal
3070 # buffer, and could block (in case of failure).
3071 try:
3072 wio.close()
3073 except IOError as e:
3074 if e.errno != errno.EBADF:
3075 raise
3076
3077 def test_interrupterd_write_retry_buffered(self):
3078 self.check_interrupted_write_retry(b"x", mode="wb")
3079
3080 def test_interrupterd_write_retry_text(self):
3081 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3082
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003083
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003084class CSignalsTest(SignalsTest):
3085 io = io
3086
3087class PySignalsTest(SignalsTest):
3088 io = pyio
3089
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003090 # Handling reentrancy issues would slow down _pyio even more, so the
3091 # tests are disabled.
3092 test_reentrant_write_buffered = None
3093 test_reentrant_write_text = None
3094
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003095
Christian Heimes1a6387e2008-03-26 12:49:49 +00003096def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00003097 tests = (CIOTest, PyIOTest,
3098 CBufferedReaderTest, PyBufferedReaderTest,
3099 CBufferedWriterTest, PyBufferedWriterTest,
3100 CBufferedRWPairTest, PyBufferedRWPairTest,
3101 CBufferedRandomTest, PyBufferedRandomTest,
3102 StatefulIncrementalDecoderTest,
3103 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3104 CTextIOWrapperTest, PyTextIOWrapperTest,
3105 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003106 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00003107 )
3108
3109 # Put the namespaces of the IO module we are testing and some useful mock
3110 # classes in the __dict__ of each test.
3111 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00003112 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00003113 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3114 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3115 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3116 globs = globals()
3117 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3118 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3119 # Avoid turning open into a bound method.
3120 py_io_ns["open"] = pyio.OpenWrapper
3121 for test in tests:
3122 if test.__name__.startswith("C"):
3123 for name, obj in c_io_ns.items():
3124 setattr(test, name, obj)
3125 elif test.__name__.startswith("Py"):
3126 for name, obj in py_io_ns.items():
3127 setattr(test, name, obj)
3128
3129 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00003130
3131if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00003132 test_main()