blob: 936dbed64a47a421d0540404a74f4072946830ac [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Antoine Pitrou19690592009-06-12 20:14:08 +000032import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000033import signal
34import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000035from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000036from collections import deque
Antoine Pitrou78e761e2012-10-16 22:57:11 +020037from UserList import UserList
Antoine Pitrou19690592009-06-12 20:14:08 +000038from test import test_support as support
Serhiy Storchaka354d50e2013-02-03 17:10:42 +020039import contextlib
Christian Heimes1a6387e2008-03-26 12:49:49 +000040
41import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000042import io # C implementation of io
43import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000044try:
45 import threading
46except ImportError:
47 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010048try:
49 import fcntl
50except ImportError:
51 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000052
53__metaclass__ = type
54bytes = support.py3k_bytes
55
56def _default_chunk_size():
57 """Get the default TextIOWrapper chunk size"""
58 with io.open(__file__, "r", encoding="latin1") as f:
59 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000060
61
Antoine Pitrou6391b342010-09-14 18:48:19 +000062class MockRawIOWithoutRead:
63 """A RawIO implementation without read(), so as to exercise the default
64 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000065
66 def __init__(self, read_stack=()):
67 self._read_stack = list(read_stack)
68 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000069 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000070 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000071
Christian Heimes1a6387e2008-03-26 12:49:49 +000072 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000073 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000074 return len(b)
75
76 def writable(self):
77 return True
78
79 def fileno(self):
80 return 42
81
82 def readable(self):
83 return True
84
85 def seekable(self):
86 return True
87
88 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000089 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000090
91 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000092 return 0 # same comment as above
93
94 def readinto(self, buf):
95 self._reads += 1
96 max_len = len(buf)
97 try:
98 data = self._read_stack[0]
99 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000100 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +0000101 return 0
102 if data is None:
103 del self._read_stack[0]
104 return None
105 n = len(data)
106 if len(data) <= max_len:
107 del self._read_stack[0]
108 buf[:n] = data
109 return n
110 else:
111 buf[:] = data[:max_len]
112 self._read_stack[0] = data[max_len:]
113 return max_len
114
115 def truncate(self, pos=None):
116 return pos
117
Antoine Pitrou6391b342010-09-14 18:48:19 +0000118class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
119 pass
120
121class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
122 pass
123
124
125class MockRawIO(MockRawIOWithoutRead):
126
127 def read(self, n=None):
128 self._reads += 1
129 try:
130 return self._read_stack.pop(0)
131 except:
132 self._extraneous_reads += 1
133 return b""
134
Antoine Pitrou19690592009-06-12 20:14:08 +0000135class CMockRawIO(MockRawIO, io.RawIOBase):
136 pass
137
138class PyMockRawIO(MockRawIO, pyio.RawIOBase):
139 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000140
141
Antoine Pitrou19690592009-06-12 20:14:08 +0000142class MisbehavedRawIO(MockRawIO):
143 def write(self, b):
144 return MockRawIO.write(self, b) * 2
145
146 def read(self, n=None):
147 return MockRawIO.read(self, n) * 2
148
149 def seek(self, pos, whence):
150 return -123
151
152 def tell(self):
153 return -456
154
155 def readinto(self, buf):
156 MockRawIO.readinto(self, buf)
157 return len(buf) * 5
158
159class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
160 pass
161
162class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
163 pass
164
165
166class CloseFailureIO(MockRawIO):
167 closed = 0
168
169 def close(self):
170 if not self.closed:
171 self.closed = 1
172 raise IOError
173
174class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
175 pass
176
177class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
178 pass
179
180
181class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000182
183 def __init__(self, data):
184 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000185 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000186
187 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000188 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000189 self.read_history.append(None if res is None else len(res))
190 return res
191
Antoine Pitrou19690592009-06-12 20:14:08 +0000192 def readinto(self, b):
193 res = super(MockFileIO, self).readinto(b)
194 self.read_history.append(res)
195 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000196
Antoine Pitrou19690592009-06-12 20:14:08 +0000197class CMockFileIO(MockFileIO, io.BytesIO):
198 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000199
Antoine Pitrou19690592009-06-12 20:14:08 +0000200class PyMockFileIO(MockFileIO, pyio.BytesIO):
201 pass
202
203
204class MockNonBlockWriterIO:
205
206 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000207 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000208 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000209
Antoine Pitrou19690592009-06-12 20:14:08 +0000210 def pop_written(self):
211 s = b"".join(self._write_stack)
212 self._write_stack[:] = []
213 return s
214
215 def block_on(self, char):
216 """Block when a given char is encountered."""
217 self._blocker_char = char
218
219 def readable(self):
220 return True
221
222 def seekable(self):
223 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000224
225 def writable(self):
226 return True
227
Antoine Pitrou19690592009-06-12 20:14:08 +0000228 def write(self, b):
229 b = bytes(b)
230 n = -1
231 if self._blocker_char:
232 try:
233 n = b.index(self._blocker_char)
234 except ValueError:
235 pass
236 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100237 if n > 0:
238 # write data up to the first blocker
239 self._write_stack.append(b[:n])
240 return n
241 else:
242 # cancel blocker and indicate would block
243 self._blocker_char = None
244 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000245 self._write_stack.append(b)
246 return len(b)
247
248class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
249 BlockingIOError = io.BlockingIOError
250
251class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
252 BlockingIOError = pyio.BlockingIOError
253
Christian Heimes1a6387e2008-03-26 12:49:49 +0000254
255class IOTest(unittest.TestCase):
256
Antoine Pitrou19690592009-06-12 20:14:08 +0000257 def setUp(self):
258 support.unlink(support.TESTFN)
259
Christian Heimes1a6387e2008-03-26 12:49:49 +0000260 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000261 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000262
263 def write_ops(self, f):
264 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000265 f.truncate(0)
266 self.assertEqual(f.tell(), 5)
267 f.seek(0)
268
269 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000270 self.assertEqual(f.seek(0), 0)
271 self.assertEqual(f.write(b"Hello."), 6)
272 self.assertEqual(f.tell(), 6)
273 self.assertEqual(f.seek(-1, 1), 5)
274 self.assertEqual(f.tell(), 5)
275 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
276 self.assertEqual(f.seek(0), 0)
277 self.assertEqual(f.write(b"h"), 1)
278 self.assertEqual(f.seek(-1, 2), 13)
279 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000280
Christian Heimes1a6387e2008-03-26 12:49:49 +0000281 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000282 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000283 self.assertRaises(TypeError, f.seek, 0.0)
284
285 def read_ops(self, f, buffered=False):
286 data = f.read(5)
287 self.assertEqual(data, b"hello")
288 data = bytearray(data)
289 self.assertEqual(f.readinto(data), 5)
290 self.assertEqual(data, b" worl")
291 self.assertEqual(f.readinto(data), 2)
292 self.assertEqual(len(data), 5)
293 self.assertEqual(data[:2], b"d\n")
294 self.assertEqual(f.seek(0), 0)
295 self.assertEqual(f.read(20), b"hello world\n")
296 self.assertEqual(f.read(1), b"")
297 self.assertEqual(f.readinto(bytearray(b"x")), 0)
298 self.assertEqual(f.seek(-6, 2), 6)
299 self.assertEqual(f.read(5), b"world")
300 self.assertEqual(f.read(0), b"")
301 self.assertEqual(f.readinto(bytearray()), 0)
302 self.assertEqual(f.seek(-6, 1), 5)
303 self.assertEqual(f.read(5), b" worl")
304 self.assertEqual(f.tell(), 10)
305 self.assertRaises(TypeError, f.seek, 0.0)
306 if buffered:
307 f.seek(0)
308 self.assertEqual(f.read(), b"hello world\n")
309 f.seek(6)
310 self.assertEqual(f.read(), b"world\n")
311 self.assertEqual(f.read(), b"")
312
313 LARGE = 2**31
314
315 def large_file_ops(self, f):
316 assert f.readable()
317 assert f.writable()
318 self.assertEqual(f.seek(self.LARGE), self.LARGE)
319 self.assertEqual(f.tell(), self.LARGE)
320 self.assertEqual(f.write(b"xxx"), 3)
321 self.assertEqual(f.tell(), self.LARGE + 3)
322 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
323 self.assertEqual(f.truncate(), self.LARGE + 2)
324 self.assertEqual(f.tell(), self.LARGE + 2)
325 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
326 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000327 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000328 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
329 self.assertEqual(f.seek(-1, 2), self.LARGE)
330 self.assertEqual(f.read(2), b"x")
331
Antoine Pitrou19690592009-06-12 20:14:08 +0000332 def test_invalid_operations(self):
333 # Try writing on a file opened in read mode and vice-versa.
334 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000335 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000336 self.assertRaises(IOError, fp.read)
337 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000338 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000339 self.assertRaises(IOError, fp.write, b"blah")
340 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000341 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000342 self.assertRaises(IOError, fp.write, "blah")
343 self.assertRaises(IOError, fp.writelines, ["blah\n"])
344
Christian Heimes1a6387e2008-03-26 12:49:49 +0000345 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000346 with self.open(support.TESTFN, "wb", buffering=0) as f:
347 self.assertEqual(f.readable(), False)
348 self.assertEqual(f.writable(), True)
349 self.assertEqual(f.seekable(), True)
350 self.write_ops(f)
351 with self.open(support.TESTFN, "rb", buffering=0) as f:
352 self.assertEqual(f.readable(), True)
353 self.assertEqual(f.writable(), False)
354 self.assertEqual(f.seekable(), True)
355 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000356
357 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000358 with self.open(support.TESTFN, "wb") as f:
359 self.assertEqual(f.readable(), False)
360 self.assertEqual(f.writable(), True)
361 self.assertEqual(f.seekable(), True)
362 self.write_ops(f)
363 with self.open(support.TESTFN, "rb") as f:
364 self.assertEqual(f.readable(), True)
365 self.assertEqual(f.writable(), False)
366 self.assertEqual(f.seekable(), True)
367 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000368
369 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000370 with self.open(support.TESTFN, "wb") as f:
371 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
372 with self.open(support.TESTFN, "rb") as f:
373 self.assertEqual(f.readline(), b"abc\n")
374 self.assertEqual(f.readline(10), b"def\n")
375 self.assertEqual(f.readline(2), b"xy")
376 self.assertEqual(f.readline(4), b"zzy\n")
377 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000378 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000379 self.assertRaises(TypeError, f.readline, 5.3)
380 with self.open(support.TESTFN, "r") as f:
381 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000382
383 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000384 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000385 self.write_ops(f)
386 data = f.getvalue()
387 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000388 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000389 self.read_ops(f, True)
390
391 def test_large_file_ops(self):
392 # On Windows and Mac OSX this test comsumes large resources; It takes
393 # a long time to build the >2GB file and takes >2GB of disk space
394 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000395 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware1f702212013-12-10 14:09:20 -0600396 support.requires(
397 'largefile',
398 'test requires %s bytes and a long time to run' % self.LARGE)
Antoine Pitrou19690592009-06-12 20:14:08 +0000399 with self.open(support.TESTFN, "w+b", 0) as f:
400 self.large_file_ops(f)
401 with self.open(support.TESTFN, "w+b") as f:
402 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000403
404 def test_with_open(self):
405 for bufsize in (0, 1, 100):
406 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000407 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000408 f.write(b"xxx")
409 self.assertEqual(f.closed, True)
410 f = None
411 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000412 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000413 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000414 except ZeroDivisionError:
415 self.assertEqual(f.closed, True)
416 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000417 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000418
Antoine Pitroue741cc62009-01-21 00:45:36 +0000419 # issue 5008
420 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000421 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000422 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000423 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000424 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000425 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000426 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000427 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000428 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000429
Christian Heimes1a6387e2008-03-26 12:49:49 +0000430 def test_destructor(self):
431 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000432 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000433 def __del__(self):
434 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000435 try:
436 f = super(MyFileIO, self).__del__
437 except AttributeError:
438 pass
439 else:
440 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000441 def close(self):
442 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000443 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000444 def flush(self):
445 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000446 super(MyFileIO, self).flush()
447 f = MyFileIO(support.TESTFN, "wb")
448 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000449 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000450 support.gc_collect()
451 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000452 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000453 self.assertEqual(f.read(), b"xxx")
454
455 def _check_base_destructor(self, base):
456 record = []
457 class MyIO(base):
458 def __init__(self):
459 # This exercises the availability of attributes on object
460 # destruction.
461 # (in the C version, close() is called by the tp_dealloc
462 # function, not by __del__)
463 self.on_del = 1
464 self.on_close = 2
465 self.on_flush = 3
466 def __del__(self):
467 record.append(self.on_del)
468 try:
469 f = super(MyIO, self).__del__
470 except AttributeError:
471 pass
472 else:
473 f()
474 def close(self):
475 record.append(self.on_close)
476 super(MyIO, self).close()
477 def flush(self):
478 record.append(self.on_flush)
479 super(MyIO, self).flush()
480 f = MyIO()
481 del f
482 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000483 self.assertEqual(record, [1, 2, 3])
484
Antoine Pitrou19690592009-06-12 20:14:08 +0000485 def test_IOBase_destructor(self):
486 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000487
Antoine Pitrou19690592009-06-12 20:14:08 +0000488 def test_RawIOBase_destructor(self):
489 self._check_base_destructor(self.RawIOBase)
490
491 def test_BufferedIOBase_destructor(self):
492 self._check_base_destructor(self.BufferedIOBase)
493
494 def test_TextIOBase_destructor(self):
495 self._check_base_destructor(self.TextIOBase)
496
497 def test_close_flushes(self):
498 with self.open(support.TESTFN, "wb") as f:
499 f.write(b"xxx")
500 with self.open(support.TESTFN, "rb") as f:
501 self.assertEqual(f.read(), b"xxx")
502
503 def test_array_writes(self):
504 a = array.array(b'i', range(10))
505 n = len(a.tostring())
506 with self.open(support.TESTFN, "wb", 0) as f:
507 self.assertEqual(f.write(a), n)
508 with self.open(support.TESTFN, "wb") as f:
509 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000510
511 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000512 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000513 closefd=False)
514
Antoine Pitrou19690592009-06-12 20:14:08 +0000515 def test_read_closed(self):
516 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000517 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000518 with self.open(support.TESTFN, "r") as f:
519 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000520 self.assertEqual(file.read(), "egg\n")
521 file.seek(0)
522 file.close()
523 self.assertRaises(ValueError, file.read)
524
525 def test_no_closefd_with_filename(self):
526 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000527 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000528
529 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000530 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000531 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000532 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000533 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000534 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000535 self.assertEqual(file.buffer.raw.closefd, False)
536
Antoine Pitrou19690592009-06-12 20:14:08 +0000537 def test_garbage_collection(self):
538 # FileIO objects are collected, and collecting them flushes
539 # all data to disk.
540 f = self.FileIO(support.TESTFN, "wb")
541 f.write(b"abcxxx")
542 f.f = f
543 wr = weakref.ref(f)
544 del f
545 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000546 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000547 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000548 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000549
Antoine Pitrou19690592009-06-12 20:14:08 +0000550 def test_unbounded_file(self):
551 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
552 zero = "/dev/zero"
553 if not os.path.exists(zero):
554 self.skipTest("{0} does not exist".format(zero))
555 if sys.maxsize > 0x7FFFFFFF:
556 self.skipTest("test can only run in a 32-bit address space")
557 if support.real_max_memuse < support._2G:
558 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000559 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000560 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000561 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000562 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000563 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000564 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000565
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000566 def test_flush_error_on_close(self):
567 f = self.open(support.TESTFN, "wb", buffering=0)
568 def bad_flush():
569 raise IOError()
570 f.flush = bad_flush
571 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600572 self.assertTrue(f.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000573
574 def test_multi_close(self):
575 f = self.open(support.TESTFN, "wb", buffering=0)
576 f.close()
577 f.close()
578 f.close()
579 self.assertRaises(ValueError, f.flush)
580
Antoine Pitrou6391b342010-09-14 18:48:19 +0000581 def test_RawIOBase_read(self):
582 # Exercise the default RawIOBase.read() implementation (which calls
583 # readinto() internally).
584 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
585 self.assertEqual(rawio.read(2), b"ab")
586 self.assertEqual(rawio.read(2), b"c")
587 self.assertEqual(rawio.read(2), b"d")
588 self.assertEqual(rawio.read(2), None)
589 self.assertEqual(rawio.read(2), b"ef")
590 self.assertEqual(rawio.read(2), b"g")
591 self.assertEqual(rawio.read(2), None)
592 self.assertEqual(rawio.read(2), b"")
593
Hynek Schlawack877effc2012-05-25 09:24:18 +0200594 def test_fileio_closefd(self):
595 # Issue #4841
596 with self.open(__file__, 'rb') as f1, \
597 self.open(__file__, 'rb') as f2:
598 fileio = self.FileIO(f1.fileno(), closefd=False)
599 # .__init__() must not close f1
600 fileio.__init__(f2.fileno(), closefd=False)
601 f1.readline()
602 # .close() must not close f2
603 fileio.close()
604 f2.readline()
605
606
Antoine Pitrou19690592009-06-12 20:14:08 +0000607class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200608
609 def test_IOBase_finalize(self):
610 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
611 # class which inherits IOBase and an object of this class are caught
612 # in a reference cycle and close() is already in the method cache.
613 class MyIO(self.IOBase):
614 def close(self):
615 pass
616
617 # create an instance to populate the method cache
618 MyIO()
619 obj = MyIO()
620 obj.obj = obj
621 wr = weakref.ref(obj)
622 del MyIO
623 del obj
624 support.gc_collect()
625 self.assertTrue(wr() is None, wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000626
Antoine Pitrou19690592009-06-12 20:14:08 +0000627class PyIOTest(IOTest):
628 test_array_writes = unittest.skip(
629 "len(array.array) returns number of elements rather than bytelength"
630 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000631
632
Antoine Pitrou19690592009-06-12 20:14:08 +0000633class CommonBufferedTests:
634 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
635
636 def test_detach(self):
637 raw = self.MockRawIO()
638 buf = self.tp(raw)
639 self.assertIs(buf.detach(), raw)
640 self.assertRaises(ValueError, buf.detach)
641
642 def test_fileno(self):
643 rawio = self.MockRawIO()
644 bufio = self.tp(rawio)
645
Ezio Melotti2623a372010-11-21 13:34:58 +0000646 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000647
Zachary Ware1f702212013-12-10 14:09:20 -0600648 @unittest.skip('test having existential crisis')
Antoine Pitrou19690592009-06-12 20:14:08 +0000649 def test_no_fileno(self):
650 # XXX will we always have fileno() function? If so, kill
651 # this test. Else, write it.
652 pass
653
654 def test_invalid_args(self):
655 rawio = self.MockRawIO()
656 bufio = self.tp(rawio)
657 # Invalid whence
658 self.assertRaises(ValueError, bufio.seek, 0, -1)
659 self.assertRaises(ValueError, bufio.seek, 0, 3)
660
661 def test_override_destructor(self):
662 tp = self.tp
663 record = []
664 class MyBufferedIO(tp):
665 def __del__(self):
666 record.append(1)
667 try:
668 f = super(MyBufferedIO, self).__del__
669 except AttributeError:
670 pass
671 else:
672 f()
673 def close(self):
674 record.append(2)
675 super(MyBufferedIO, self).close()
676 def flush(self):
677 record.append(3)
678 super(MyBufferedIO, self).flush()
679 rawio = self.MockRawIO()
680 bufio = MyBufferedIO(rawio)
681 writable = bufio.writable()
682 del bufio
683 support.gc_collect()
684 if writable:
685 self.assertEqual(record, [1, 2, 3])
686 else:
687 self.assertEqual(record, [1, 2])
688
689 def test_context_manager(self):
690 # Test usability as a context manager
691 rawio = self.MockRawIO()
692 bufio = self.tp(rawio)
693 def _with():
694 with bufio:
695 pass
696 _with()
697 # bufio should now be closed, and using it a second time should raise
698 # a ValueError.
699 self.assertRaises(ValueError, _with)
700
701 def test_error_through_destructor(self):
702 # Test that the exception state is not modified by a destructor,
703 # even if close() fails.
704 rawio = self.CloseFailureIO()
705 def f():
706 self.tp(rawio).xyzzy
707 with support.captured_output("stderr") as s:
708 self.assertRaises(AttributeError, f)
709 s = s.getvalue().strip()
710 if s:
711 # The destructor *may* have printed an unraisable error, check it
712 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000713 self.assertTrue(s.startswith("Exception IOError: "), s)
714 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000715
716 def test_repr(self):
717 raw = self.MockRawIO()
718 b = self.tp(raw)
719 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
720 self.assertEqual(repr(b), "<%s>" % clsname)
721 raw.name = "dummy"
722 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
723 raw.name = b"dummy"
724 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000725
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000726 def test_flush_error_on_close(self):
727 raw = self.MockRawIO()
728 def bad_flush():
729 raise IOError()
730 raw.flush = bad_flush
731 b = self.tp(raw)
732 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600733 self.assertTrue(b.closed)
734
735 def test_close_error_on_close(self):
736 raw = self.MockRawIO()
737 def bad_flush():
738 raise IOError('flush')
739 def bad_close():
740 raise IOError('close')
741 raw.close = bad_close
742 b = self.tp(raw)
743 b.flush = bad_flush
744 with self.assertRaises(IOError) as err: # exception not swallowed
745 b.close()
746 self.assertEqual(err.exception.args, ('close',))
747 self.assertFalse(b.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000748
749 def test_multi_close(self):
750 raw = self.MockRawIO()
751 b = self.tp(raw)
752 b.close()
753 b.close()
754 b.close()
755 self.assertRaises(ValueError, b.flush)
756
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000757 def test_readonly_attributes(self):
758 raw = self.MockRawIO()
759 buf = self.tp(raw)
760 x = self.MockRawIO()
761 with self.assertRaises((AttributeError, TypeError)):
762 buf.raw = x
763
Christian Heimes1a6387e2008-03-26 12:49:49 +0000764
Antoine Pitroubff5df02012-07-29 19:02:46 +0200765class SizeofTest:
766
767 @support.cpython_only
768 def test_sizeof(self):
769 bufsize1 = 4096
770 bufsize2 = 8192
771 rawio = self.MockRawIO()
772 bufio = self.tp(rawio, buffer_size=bufsize1)
773 size = sys.getsizeof(bufio) - bufsize1
774 rawio = self.MockRawIO()
775 bufio = self.tp(rawio, buffer_size=bufsize2)
776 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
777
778
Antoine Pitrou19690592009-06-12 20:14:08 +0000779class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
780 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000781
Antoine Pitrou19690592009-06-12 20:14:08 +0000782 def test_constructor(self):
783 rawio = self.MockRawIO([b"abc"])
784 bufio = self.tp(rawio)
785 bufio.__init__(rawio)
786 bufio.__init__(rawio, buffer_size=1024)
787 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000788 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000789 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
790 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
791 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
792 rawio = self.MockRawIO([b"abc"])
793 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000794 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000795
Serhiy Storchaka1d19f972014-02-12 10:52:07 +0200796 def test_uninitialized(self):
797 bufio = self.tp.__new__(self.tp)
798 del bufio
799 bufio = self.tp.__new__(self.tp)
800 self.assertRaisesRegexp((ValueError, AttributeError),
801 'uninitialized|has no attribute',
802 bufio.read, 0)
803 bufio.__init__(self.MockRawIO())
804 self.assertEqual(bufio.read(0), b'')
805
Antoine Pitrou19690592009-06-12 20:14:08 +0000806 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000807 for arg in (None, 7):
808 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
809 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000810 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000811 # Invalid args
812 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000813
Antoine Pitrou19690592009-06-12 20:14:08 +0000814 def test_read1(self):
815 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
816 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000817 self.assertEqual(b"a", bufio.read(1))
818 self.assertEqual(b"b", bufio.read1(1))
819 self.assertEqual(rawio._reads, 1)
820 self.assertEqual(b"c", bufio.read1(100))
821 self.assertEqual(rawio._reads, 1)
822 self.assertEqual(b"d", bufio.read1(100))
823 self.assertEqual(rawio._reads, 2)
824 self.assertEqual(b"efg", bufio.read1(100))
825 self.assertEqual(rawio._reads, 3)
826 self.assertEqual(b"", bufio.read1(100))
827 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000828 # Invalid args
829 self.assertRaises(ValueError, bufio.read1, -1)
830
831 def test_readinto(self):
832 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
833 bufio = self.tp(rawio)
834 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000835 self.assertEqual(bufio.readinto(b), 2)
836 self.assertEqual(b, b"ab")
837 self.assertEqual(bufio.readinto(b), 2)
838 self.assertEqual(b, b"cd")
839 self.assertEqual(bufio.readinto(b), 2)
840 self.assertEqual(b, b"ef")
841 self.assertEqual(bufio.readinto(b), 1)
842 self.assertEqual(b, b"gf")
843 self.assertEqual(bufio.readinto(b), 0)
844 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000845
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000846 def test_readlines(self):
847 def bufio():
848 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
849 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000850 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
851 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
852 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000853
Antoine Pitrou19690592009-06-12 20:14:08 +0000854 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000855 data = b"abcdefghi"
856 dlen = len(data)
857
858 tests = [
859 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
860 [ 100, [ 3, 3, 3], [ dlen ] ],
861 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
862 ]
863
864 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000865 rawio = self.MockFileIO(data)
866 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000867 pos = 0
868 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000869 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000870 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000871 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000872 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000873
Antoine Pitrou19690592009-06-12 20:14:08 +0000874 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000875 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000876 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
877 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000878 self.assertEqual(b"abcd", bufio.read(6))
879 self.assertEqual(b"e", bufio.read(1))
880 self.assertEqual(b"fg", bufio.read())
881 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200882 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000883 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000884
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200885 rawio = self.MockRawIO((b"a", None, None))
886 self.assertEqual(b"a", rawio.readall())
887 self.assertIsNone(rawio.readall())
888
Antoine Pitrou19690592009-06-12 20:14:08 +0000889 def test_read_past_eof(self):
890 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
891 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000892
Ezio Melotti2623a372010-11-21 13:34:58 +0000893 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000894
Antoine Pitrou19690592009-06-12 20:14:08 +0000895 def test_read_all(self):
896 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
897 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000898
Ezio Melotti2623a372010-11-21 13:34:58 +0000899 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000900
Victor Stinner6a102812010-04-27 23:55:59 +0000901 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000902 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000903 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000904 try:
905 # Write out many bytes with exactly the same number of 0's,
906 # 1's... 255's. This will help us check that concurrent reading
907 # doesn't duplicate or forget contents.
908 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000909 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000910 random.shuffle(l)
911 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000912 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000913 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000914 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000915 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000916 errors = []
917 results = []
918 def f():
919 try:
920 # Intra-buffer read then buffer-flushing read
921 for n in cycle([1, 19]):
922 s = bufio.read(n)
923 if not s:
924 break
925 # list.append() is atomic
926 results.append(s)
927 except Exception as e:
928 errors.append(e)
929 raise
930 threads = [threading.Thread(target=f) for x in range(20)]
931 for t in threads:
932 t.start()
933 time.sleep(0.02) # yield
934 for t in threads:
935 t.join()
936 self.assertFalse(errors,
937 "the following exceptions were caught: %r" % errors)
938 s = b''.join(results)
939 for i in range(256):
940 c = bytes(bytearray([i]))
941 self.assertEqual(s.count(c), N)
942 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000943 support.unlink(support.TESTFN)
944
945 def test_misbehaved_io(self):
946 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
947 bufio = self.tp(rawio)
948 self.assertRaises(IOError, bufio.seek, 0)
949 self.assertRaises(IOError, bufio.tell)
950
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000951 def test_no_extraneous_read(self):
952 # Issue #9550; when the raw IO object has satisfied the read request,
953 # we should not issue any additional reads, otherwise it may block
954 # (e.g. socket).
955 bufsize = 16
956 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
957 rawio = self.MockRawIO([b"x" * n])
958 bufio = self.tp(rawio, bufsize)
959 self.assertEqual(bufio.read(n), b"x" * n)
960 # Simple case: one raw read is enough to satisfy the request.
961 self.assertEqual(rawio._extraneous_reads, 0,
962 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
963 # A more complex case where two raw reads are needed to satisfy
964 # the request.
965 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
966 bufio = self.tp(rawio, bufsize)
967 self.assertEqual(bufio.read(n), b"x" * n)
968 self.assertEqual(rawio._extraneous_reads, 0,
969 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
970
971
Antoine Pitroubff5df02012-07-29 19:02:46 +0200972class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +0000973 tp = io.BufferedReader
974
975 def test_constructor(self):
976 BufferedReaderTest.test_constructor(self)
977 # The allocation can succeed on 32-bit builds, e.g. with more
978 # than 2GB RAM and a 64-bit kernel.
979 if sys.maxsize > 0x7FFFFFFF:
980 rawio = self.MockRawIO()
981 bufio = self.tp(rawio)
982 self.assertRaises((OverflowError, MemoryError, ValueError),
983 bufio.__init__, rawio, sys.maxsize)
984
985 def test_initialization(self):
986 rawio = self.MockRawIO([b"abc"])
987 bufio = self.tp(rawio)
988 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
989 self.assertRaises(ValueError, bufio.read)
990 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
991 self.assertRaises(ValueError, bufio.read)
992 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
993 self.assertRaises(ValueError, bufio.read)
994
995 def test_misbehaved_io_read(self):
996 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
997 bufio = self.tp(rawio)
998 # _pyio.BufferedReader seems to implement reading different, so that
999 # checking this is not so easy.
1000 self.assertRaises(IOError, bufio.read, 10)
1001
1002 def test_garbage_collection(self):
1003 # C BufferedReader objects are collected.
1004 # The Python version has __del__, so it ends into gc.garbage instead
1005 rawio = self.FileIO(support.TESTFN, "w+b")
1006 f = self.tp(rawio)
1007 f.f = f
1008 wr = weakref.ref(f)
1009 del f
1010 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001011 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00001012
R David Murray5b2cf5e2013-02-23 22:11:21 -05001013 def test_args_error(self):
1014 # Issue #17275
1015 with self.assertRaisesRegexp(TypeError, "BufferedReader"):
1016 self.tp(io.BytesIO(), 1024, 1024, 1024)
1017
1018
Antoine Pitrou19690592009-06-12 20:14:08 +00001019class PyBufferedReaderTest(BufferedReaderTest):
1020 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001021
1022
Antoine Pitrou19690592009-06-12 20:14:08 +00001023class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1024 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001025
Antoine Pitrou19690592009-06-12 20:14:08 +00001026 def test_constructor(self):
1027 rawio = self.MockRawIO()
1028 bufio = self.tp(rawio)
1029 bufio.__init__(rawio)
1030 bufio.__init__(rawio, buffer_size=1024)
1031 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +00001032 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001033 bufio.flush()
1034 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1035 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1036 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1037 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +00001038 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001039 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001040 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001041
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001042 def test_uninitialized(self):
1043 bufio = self.tp.__new__(self.tp)
1044 del bufio
1045 bufio = self.tp.__new__(self.tp)
1046 self.assertRaisesRegexp((ValueError, AttributeError),
1047 'uninitialized|has no attribute',
1048 bufio.write, b'')
1049 bufio.__init__(self.MockRawIO())
1050 self.assertEqual(bufio.write(b''), 0)
1051
Antoine Pitrou19690592009-06-12 20:14:08 +00001052 def test_detach_flush(self):
1053 raw = self.MockRawIO()
1054 buf = self.tp(raw)
1055 buf.write(b"howdy!")
1056 self.assertFalse(raw._write_stack)
1057 buf.detach()
1058 self.assertEqual(raw._write_stack, [b"howdy!"])
1059
1060 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001061 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +00001062 writer = self.MockRawIO()
1063 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001064 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001065 self.assertFalse(writer._write_stack)
1066
Antoine Pitrou19690592009-06-12 20:14:08 +00001067 def test_write_overflow(self):
1068 writer = self.MockRawIO()
1069 bufio = self.tp(writer, 8)
1070 contents = b"abcdefghijklmnop"
1071 for n in range(0, len(contents), 3):
1072 bufio.write(contents[n:n+3])
1073 flushed = b"".join(writer._write_stack)
1074 # At least (total - 8) bytes were implicitly flushed, perhaps more
1075 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001076 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001077
Antoine Pitrou19690592009-06-12 20:14:08 +00001078 def check_writes(self, intermediate_func):
1079 # Lots of writes, test the flushed output is as expected.
1080 contents = bytes(range(256)) * 1000
1081 n = 0
1082 writer = self.MockRawIO()
1083 bufio = self.tp(writer, 13)
1084 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1085 def gen_sizes():
1086 for size in count(1):
1087 for i in range(15):
1088 yield size
1089 sizes = gen_sizes()
1090 while n < len(contents):
1091 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001092 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001093 intermediate_func(bufio)
1094 n += size
1095 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001096 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001097 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001098
Antoine Pitrou19690592009-06-12 20:14:08 +00001099 def test_writes(self):
1100 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001101
Antoine Pitrou19690592009-06-12 20:14:08 +00001102 def test_writes_and_flushes(self):
1103 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001104
Antoine Pitrou19690592009-06-12 20:14:08 +00001105 def test_writes_and_seeks(self):
1106 def _seekabs(bufio):
1107 pos = bufio.tell()
1108 bufio.seek(pos + 1, 0)
1109 bufio.seek(pos - 1, 0)
1110 bufio.seek(pos, 0)
1111 self.check_writes(_seekabs)
1112 def _seekrel(bufio):
1113 pos = bufio.seek(0, 1)
1114 bufio.seek(+1, 1)
1115 bufio.seek(-1, 1)
1116 bufio.seek(pos, 0)
1117 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001118
Antoine Pitrou19690592009-06-12 20:14:08 +00001119 def test_writes_and_truncates(self):
1120 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001121
Antoine Pitrou19690592009-06-12 20:14:08 +00001122 def test_write_non_blocking(self):
1123 raw = self.MockNonBlockWriterIO()
1124 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001125
Ezio Melotti2623a372010-11-21 13:34:58 +00001126 self.assertEqual(bufio.write(b"abcd"), 4)
1127 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001128 # 1 byte will be written, the rest will be buffered
1129 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001130 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001131
Antoine Pitrou19690592009-06-12 20:14:08 +00001132 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1133 raw.block_on(b"0")
1134 try:
1135 bufio.write(b"opqrwxyz0123456789")
1136 except self.BlockingIOError as e:
1137 written = e.characters_written
1138 else:
1139 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001140 self.assertEqual(written, 16)
1141 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001142 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001143
Ezio Melotti2623a372010-11-21 13:34:58 +00001144 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001145 s = raw.pop_written()
1146 # Previously buffered bytes were flushed
1147 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001148
Antoine Pitrou19690592009-06-12 20:14:08 +00001149 def test_write_and_rewind(self):
1150 raw = io.BytesIO()
1151 bufio = self.tp(raw, 4)
1152 self.assertEqual(bufio.write(b"abcdef"), 6)
1153 self.assertEqual(bufio.tell(), 6)
1154 bufio.seek(0, 0)
1155 self.assertEqual(bufio.write(b"XY"), 2)
1156 bufio.seek(6, 0)
1157 self.assertEqual(raw.getvalue(), b"XYcdef")
1158 self.assertEqual(bufio.write(b"123456"), 6)
1159 bufio.flush()
1160 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001161
Antoine Pitrou19690592009-06-12 20:14:08 +00001162 def test_flush(self):
1163 writer = self.MockRawIO()
1164 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001165 bufio.write(b"abc")
1166 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001167 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001168
Antoine Pitrou78e761e2012-10-16 22:57:11 +02001169 def test_writelines(self):
1170 l = [b'ab', b'cd', b'ef']
1171 writer = self.MockRawIO()
1172 bufio = self.tp(writer, 8)
1173 bufio.writelines(l)
1174 bufio.flush()
1175 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1176
1177 def test_writelines_userlist(self):
1178 l = UserList([b'ab', b'cd', b'ef'])
1179 writer = self.MockRawIO()
1180 bufio = self.tp(writer, 8)
1181 bufio.writelines(l)
1182 bufio.flush()
1183 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1184
1185 def test_writelines_error(self):
1186 writer = self.MockRawIO()
1187 bufio = self.tp(writer, 8)
1188 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1189 self.assertRaises(TypeError, bufio.writelines, None)
1190
Antoine Pitrou19690592009-06-12 20:14:08 +00001191 def test_destructor(self):
1192 writer = self.MockRawIO()
1193 bufio = self.tp(writer, 8)
1194 bufio.write(b"abc")
1195 del bufio
1196 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001197 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001198
1199 def test_truncate(self):
1200 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001201 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001202 bufio = self.tp(raw, 8)
1203 bufio.write(b"abcdef")
1204 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001205 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001206 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001207 self.assertEqual(f.read(), b"abc")
1208
Victor Stinner6a102812010-04-27 23:55:59 +00001209 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001210 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001211 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001212 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001213 # Write out many bytes from many threads and test they were
1214 # all flushed.
1215 N = 1000
1216 contents = bytes(range(256)) * N
1217 sizes = cycle([1, 19])
1218 n = 0
1219 queue = deque()
1220 while n < len(contents):
1221 size = next(sizes)
1222 queue.append(contents[n:n+size])
1223 n += size
1224 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001225 # We use a real file object because it allows us to
1226 # exercise situations where the GIL is released before
1227 # writing the buffer to the raw streams. This is in addition
1228 # to concurrency issues due to switching threads in the middle
1229 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001230 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001231 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001232 errors = []
1233 def f():
1234 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001235 while True:
1236 try:
1237 s = queue.popleft()
1238 except IndexError:
1239 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001240 bufio.write(s)
1241 except Exception as e:
1242 errors.append(e)
1243 raise
1244 threads = [threading.Thread(target=f) for x in range(20)]
1245 for t in threads:
1246 t.start()
1247 time.sleep(0.02) # yield
1248 for t in threads:
1249 t.join()
1250 self.assertFalse(errors,
1251 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001252 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001253 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001254 s = f.read()
1255 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001256 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001257 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001258 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001259
Antoine Pitrou19690592009-06-12 20:14:08 +00001260 def test_misbehaved_io(self):
1261 rawio = self.MisbehavedRawIO()
1262 bufio = self.tp(rawio, 5)
1263 self.assertRaises(IOError, bufio.seek, 0)
1264 self.assertRaises(IOError, bufio.tell)
1265 self.assertRaises(IOError, bufio.write, b"abcdef")
1266
1267 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001268 with support.check_warnings(("max_buffer_size is deprecated",
1269 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001270 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001271
Benjamin Petersona2d6d712012-12-20 12:24:10 -06001272 def test_write_error_on_close(self):
1273 raw = self.MockRawIO()
1274 def bad_write(b):
1275 raise IOError()
1276 raw.write = bad_write
1277 b = self.tp(raw)
1278 b.write(b'spam')
1279 self.assertRaises(IOError, b.close) # exception not swallowed
1280 self.assertTrue(b.closed)
1281
Antoine Pitrou19690592009-06-12 20:14:08 +00001282
Antoine Pitroubff5df02012-07-29 19:02:46 +02001283class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001284 tp = io.BufferedWriter
1285
1286 def test_constructor(self):
1287 BufferedWriterTest.test_constructor(self)
1288 # The allocation can succeed on 32-bit builds, e.g. with more
1289 # than 2GB RAM and a 64-bit kernel.
1290 if sys.maxsize > 0x7FFFFFFF:
1291 rawio = self.MockRawIO()
1292 bufio = self.tp(rawio)
1293 self.assertRaises((OverflowError, MemoryError, ValueError),
1294 bufio.__init__, rawio, sys.maxsize)
1295
1296 def test_initialization(self):
1297 rawio = self.MockRawIO()
1298 bufio = self.tp(rawio)
1299 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1300 self.assertRaises(ValueError, bufio.write, b"def")
1301 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1302 self.assertRaises(ValueError, bufio.write, b"def")
1303 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1304 self.assertRaises(ValueError, bufio.write, b"def")
1305
1306 def test_garbage_collection(self):
1307 # C BufferedWriter objects are collected, and collecting them flushes
1308 # all data to disk.
1309 # The Python version has __del__, so it ends into gc.garbage instead
1310 rawio = self.FileIO(support.TESTFN, "w+b")
1311 f = self.tp(rawio)
1312 f.write(b"123xxx")
1313 f.x = f
1314 wr = weakref.ref(f)
1315 del f
1316 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001317 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001318 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001319 self.assertEqual(f.read(), b"123xxx")
1320
R David Murray5b2cf5e2013-02-23 22:11:21 -05001321 def test_args_error(self):
1322 # Issue #17275
1323 with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
1324 self.tp(io.BytesIO(), 1024, 1024, 1024)
1325
Antoine Pitrou19690592009-06-12 20:14:08 +00001326
1327class PyBufferedWriterTest(BufferedWriterTest):
1328 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001329
1330class BufferedRWPairTest(unittest.TestCase):
1331
Antoine Pitrou19690592009-06-12 20:14:08 +00001332 def test_constructor(self):
1333 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001334 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001335
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001336 def test_uninitialized(self):
1337 pair = self.tp.__new__(self.tp)
1338 del pair
1339 pair = self.tp.__new__(self.tp)
1340 self.assertRaisesRegexp((ValueError, AttributeError),
1341 'uninitialized|has no attribute',
1342 pair.read, 0)
1343 self.assertRaisesRegexp((ValueError, AttributeError),
1344 'uninitialized|has no attribute',
1345 pair.write, b'')
1346 pair.__init__(self.MockRawIO(), self.MockRawIO())
1347 self.assertEqual(pair.read(0), b'')
1348 self.assertEqual(pair.write(b''), 0)
1349
Antoine Pitrou19690592009-06-12 20:14:08 +00001350 def test_detach(self):
1351 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1352 self.assertRaises(self.UnsupportedOperation, pair.detach)
1353
1354 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001355 with support.check_warnings(("max_buffer_size is deprecated",
1356 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001357 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001358
1359 def test_constructor_with_not_readable(self):
1360 class NotReadable(MockRawIO):
1361 def readable(self):
1362 return False
1363
1364 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1365
1366 def test_constructor_with_not_writeable(self):
1367 class NotWriteable(MockRawIO):
1368 def writable(self):
1369 return False
1370
1371 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1372
1373 def test_read(self):
1374 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1375
1376 self.assertEqual(pair.read(3), b"abc")
1377 self.assertEqual(pair.read(1), b"d")
1378 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001379 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1380 self.assertEqual(pair.read(None), b"abc")
1381
1382 def test_readlines(self):
1383 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1384 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1385 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1386 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001387
1388 def test_read1(self):
1389 # .read1() is delegated to the underlying reader object, so this test
1390 # can be shallow.
1391 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1392
1393 self.assertEqual(pair.read1(3), b"abc")
1394
1395 def test_readinto(self):
1396 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1397
1398 data = bytearray(5)
1399 self.assertEqual(pair.readinto(data), 5)
1400 self.assertEqual(data, b"abcde")
1401
1402 def test_write(self):
1403 w = self.MockRawIO()
1404 pair = self.tp(self.MockRawIO(), w)
1405
1406 pair.write(b"abc")
1407 pair.flush()
1408 pair.write(b"def")
1409 pair.flush()
1410 self.assertEqual(w._write_stack, [b"abc", b"def"])
1411
1412 def test_peek(self):
1413 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1414
1415 self.assertTrue(pair.peek(3).startswith(b"abc"))
1416 self.assertEqual(pair.read(3), b"abc")
1417
1418 def test_readable(self):
1419 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1420 self.assertTrue(pair.readable())
1421
1422 def test_writeable(self):
1423 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1424 self.assertTrue(pair.writable())
1425
1426 def test_seekable(self):
1427 # BufferedRWPairs are never seekable, even if their readers and writers
1428 # are.
1429 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1430 self.assertFalse(pair.seekable())
1431
1432 # .flush() is delegated to the underlying writer object and has been
1433 # tested in the test_write method.
1434
1435 def test_close_and_closed(self):
1436 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1437 self.assertFalse(pair.closed)
1438 pair.close()
1439 self.assertTrue(pair.closed)
1440
1441 def test_isatty(self):
1442 class SelectableIsAtty(MockRawIO):
1443 def __init__(self, isatty):
1444 MockRawIO.__init__(self)
1445 self._isatty = isatty
1446
1447 def isatty(self):
1448 return self._isatty
1449
1450 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1451 self.assertFalse(pair.isatty())
1452
1453 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1454 self.assertTrue(pair.isatty())
1455
1456 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1457 self.assertTrue(pair.isatty())
1458
1459 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1460 self.assertTrue(pair.isatty())
1461
1462class CBufferedRWPairTest(BufferedRWPairTest):
1463 tp = io.BufferedRWPair
1464
1465class PyBufferedRWPairTest(BufferedRWPairTest):
1466 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001467
1468
Antoine Pitrou19690592009-06-12 20:14:08 +00001469class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1470 read_mode = "rb+"
1471 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001472
Antoine Pitrou19690592009-06-12 20:14:08 +00001473 def test_constructor(self):
1474 BufferedReaderTest.test_constructor(self)
1475 BufferedWriterTest.test_constructor(self)
1476
Serhiy Storchaka1d19f972014-02-12 10:52:07 +02001477 def test_uninitialized(self):
1478 BufferedReaderTest.test_uninitialized(self)
1479 BufferedWriterTest.test_uninitialized(self)
1480
Antoine Pitrou19690592009-06-12 20:14:08 +00001481 def test_read_and_write(self):
1482 raw = self.MockRawIO((b"asdf", b"ghjk"))
1483 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001484
1485 self.assertEqual(b"as", rw.read(2))
1486 rw.write(b"ddd")
1487 rw.write(b"eee")
1488 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001489 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001490 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001491
Antoine Pitrou19690592009-06-12 20:14:08 +00001492 def test_seek_and_tell(self):
1493 raw = self.BytesIO(b"asdfghjkl")
1494 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001495
Ezio Melotti2623a372010-11-21 13:34:58 +00001496 self.assertEqual(b"as", rw.read(2))
1497 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001498 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001499 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001500
Antoine Pitrou808cec52011-08-20 15:40:58 +02001501 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001502 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001503 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001504 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001505 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001506 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001507 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001508 self.assertEqual(7, rw.tell())
1509 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001510 rw.flush()
1511 self.assertEqual(b"asdf123fl", raw.getvalue())
1512
Christian Heimes1a6387e2008-03-26 12:49:49 +00001513 self.assertRaises(TypeError, rw.seek, 0.0)
1514
Antoine Pitrou19690592009-06-12 20:14:08 +00001515 def check_flush_and_read(self, read_func):
1516 raw = self.BytesIO(b"abcdefghi")
1517 bufio = self.tp(raw)
1518
Ezio Melotti2623a372010-11-21 13:34:58 +00001519 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001520 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001521 self.assertEqual(b"ef", read_func(bufio, 2))
1522 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001523 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001524 self.assertEqual(6, bufio.tell())
1525 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001526 raw.seek(0, 0)
1527 raw.write(b"XYZ")
1528 # flush() resets the read buffer
1529 bufio.flush()
1530 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001531 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001532
1533 def test_flush_and_read(self):
1534 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1535
1536 def test_flush_and_readinto(self):
1537 def _readinto(bufio, n=-1):
1538 b = bytearray(n if n >= 0 else 9999)
1539 n = bufio.readinto(b)
1540 return bytes(b[:n])
1541 self.check_flush_and_read(_readinto)
1542
1543 def test_flush_and_peek(self):
1544 def _peek(bufio, n=-1):
1545 # This relies on the fact that the buffer can contain the whole
1546 # raw stream, otherwise peek() can return less.
1547 b = bufio.peek(n)
1548 if n != -1:
1549 b = b[:n]
1550 bufio.seek(len(b), 1)
1551 return b
1552 self.check_flush_and_read(_peek)
1553
1554 def test_flush_and_write(self):
1555 raw = self.BytesIO(b"abcdefghi")
1556 bufio = self.tp(raw)
1557
1558 bufio.write(b"123")
1559 bufio.flush()
1560 bufio.write(b"45")
1561 bufio.flush()
1562 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001563 self.assertEqual(b"12345fghi", raw.getvalue())
1564 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001565
1566 def test_threads(self):
1567 BufferedReaderTest.test_threads(self)
1568 BufferedWriterTest.test_threads(self)
1569
1570 def test_writes_and_peek(self):
1571 def _peek(bufio):
1572 bufio.peek(1)
1573 self.check_writes(_peek)
1574 def _peek(bufio):
1575 pos = bufio.tell()
1576 bufio.seek(-1, 1)
1577 bufio.peek(1)
1578 bufio.seek(pos, 0)
1579 self.check_writes(_peek)
1580
1581 def test_writes_and_reads(self):
1582 def _read(bufio):
1583 bufio.seek(-1, 1)
1584 bufio.read(1)
1585 self.check_writes(_read)
1586
1587 def test_writes_and_read1s(self):
1588 def _read1(bufio):
1589 bufio.seek(-1, 1)
1590 bufio.read1(1)
1591 self.check_writes(_read1)
1592
1593 def test_writes_and_readintos(self):
1594 def _read(bufio):
1595 bufio.seek(-1, 1)
1596 bufio.readinto(bytearray(1))
1597 self.check_writes(_read)
1598
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001599 def test_write_after_readahead(self):
1600 # Issue #6629: writing after the buffer was filled by readahead should
1601 # first rewind the raw stream.
1602 for overwrite_size in [1, 5]:
1603 raw = self.BytesIO(b"A" * 10)
1604 bufio = self.tp(raw, 4)
1605 # Trigger readahead
1606 self.assertEqual(bufio.read(1), b"A")
1607 self.assertEqual(bufio.tell(), 1)
1608 # Overwriting should rewind the raw stream if it needs so
1609 bufio.write(b"B" * overwrite_size)
1610 self.assertEqual(bufio.tell(), overwrite_size + 1)
1611 # If the write size was smaller than the buffer size, flush() and
1612 # check that rewind happens.
1613 bufio.flush()
1614 self.assertEqual(bufio.tell(), overwrite_size + 1)
1615 s = raw.getvalue()
1616 self.assertEqual(s,
1617 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1618
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001619 def test_write_rewind_write(self):
1620 # Various combinations of reading / writing / seeking backwards / writing again
1621 def mutate(bufio, pos1, pos2):
1622 assert pos2 >= pos1
1623 # Fill the buffer
1624 bufio.seek(pos1)
1625 bufio.read(pos2 - pos1)
1626 bufio.write(b'\x02')
1627 # This writes earlier than the previous write, but still inside
1628 # the buffer.
1629 bufio.seek(pos1)
1630 bufio.write(b'\x01')
1631
1632 b = b"\x80\x81\x82\x83\x84"
1633 for i in range(0, len(b)):
1634 for j in range(i, len(b)):
1635 raw = self.BytesIO(b)
1636 bufio = self.tp(raw, 100)
1637 mutate(bufio, i, j)
1638 bufio.flush()
1639 expected = bytearray(b)
1640 expected[j] = 2
1641 expected[i] = 1
1642 self.assertEqual(raw.getvalue(), expected,
1643 "failed result for i=%d, j=%d" % (i, j))
1644
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001645 def test_truncate_after_read_or_write(self):
1646 raw = self.BytesIO(b"A" * 10)
1647 bufio = self.tp(raw, 100)
1648 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1649 self.assertEqual(bufio.truncate(), 2)
1650 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1651 self.assertEqual(bufio.truncate(), 4)
1652
Antoine Pitrou19690592009-06-12 20:14:08 +00001653 def test_misbehaved_io(self):
1654 BufferedReaderTest.test_misbehaved_io(self)
1655 BufferedWriterTest.test_misbehaved_io(self)
1656
Antoine Pitrou808cec52011-08-20 15:40:58 +02001657 def test_interleaved_read_write(self):
1658 # Test for issue #12213
1659 with self.BytesIO(b'abcdefgh') as raw:
1660 with self.tp(raw, 100) as f:
1661 f.write(b"1")
1662 self.assertEqual(f.read(1), b'b')
1663 f.write(b'2')
1664 self.assertEqual(f.read1(1), b'd')
1665 f.write(b'3')
1666 buf = bytearray(1)
1667 f.readinto(buf)
1668 self.assertEqual(buf, b'f')
1669 f.write(b'4')
1670 self.assertEqual(f.peek(1), b'h')
1671 f.flush()
1672 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1673
1674 with self.BytesIO(b'abc') as raw:
1675 with self.tp(raw, 100) as f:
1676 self.assertEqual(f.read(1), b'a')
1677 f.write(b"2")
1678 self.assertEqual(f.read(1), b'c')
1679 f.flush()
1680 self.assertEqual(raw.getvalue(), b'a2c')
1681
1682 def test_interleaved_readline_write(self):
1683 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1684 with self.tp(raw) as f:
1685 f.write(b'1')
1686 self.assertEqual(f.readline(), b'b\n')
1687 f.write(b'2')
1688 self.assertEqual(f.readline(), b'def\n')
1689 f.write(b'3')
1690 self.assertEqual(f.readline(), b'\n')
1691 f.flush()
1692 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1693
R David Murray5b2cf5e2013-02-23 22:11:21 -05001694
Antoine Pitroubff5df02012-07-29 19:02:46 +02001695class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1696 BufferedRandomTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001697 tp = io.BufferedRandom
1698
1699 def test_constructor(self):
1700 BufferedRandomTest.test_constructor(self)
1701 # The allocation can succeed on 32-bit builds, e.g. with more
1702 # than 2GB RAM and a 64-bit kernel.
1703 if sys.maxsize > 0x7FFFFFFF:
1704 rawio = self.MockRawIO()
1705 bufio = self.tp(rawio)
1706 self.assertRaises((OverflowError, MemoryError, ValueError),
1707 bufio.__init__, rawio, sys.maxsize)
1708
1709 def test_garbage_collection(self):
1710 CBufferedReaderTest.test_garbage_collection(self)
1711 CBufferedWriterTest.test_garbage_collection(self)
1712
R David Murray5b2cf5e2013-02-23 22:11:21 -05001713 def test_args_error(self):
1714 # Issue #17275
1715 with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
1716 self.tp(io.BytesIO(), 1024, 1024, 1024)
1717
1718
Antoine Pitrou19690592009-06-12 20:14:08 +00001719class PyBufferedRandomTest(BufferedRandomTest):
1720 tp = pyio.BufferedRandom
1721
1722
Christian Heimes1a6387e2008-03-26 12:49:49 +00001723# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1724# properties:
1725# - A single output character can correspond to many bytes of input.
1726# - The number of input bytes to complete the character can be
1727# undetermined until the last input byte is received.
1728# - The number of input bytes can vary depending on previous input.
1729# - A single input byte can correspond to many characters of output.
1730# - The number of output characters can be undetermined until the
1731# last input byte is received.
1732# - The number of output characters can vary depending on previous input.
1733
1734class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1735 """
1736 For testing seek/tell behavior with a stateful, buffering decoder.
1737
1738 Input is a sequence of words. Words may be fixed-length (length set
1739 by input) or variable-length (period-terminated). In variable-length
1740 mode, extra periods are ignored. Possible words are:
1741 - 'i' followed by a number sets the input length, I (maximum 99).
1742 When I is set to 0, words are space-terminated.
1743 - 'o' followed by a number sets the output length, O (maximum 99).
1744 - Any other word is converted into a word followed by a period on
1745 the output. The output word consists of the input word truncated
1746 or padded out with hyphens to make its length equal to O. If O
1747 is 0, the word is output verbatim without truncating or padding.
1748 I and O are initially set to 1. When I changes, any buffered input is
1749 re-scanned according to the new I. EOF also terminates the last word.
1750 """
1751
1752 def __init__(self, errors='strict'):
1753 codecs.IncrementalDecoder.__init__(self, errors)
1754 self.reset()
1755
1756 def __repr__(self):
1757 return '<SID %x>' % id(self)
1758
1759 def reset(self):
1760 self.i = 1
1761 self.o = 1
1762 self.buffer = bytearray()
1763
1764 def getstate(self):
1765 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1766 return bytes(self.buffer), i*100 + o
1767
1768 def setstate(self, state):
1769 buffer, io = state
1770 self.buffer = bytearray(buffer)
1771 i, o = divmod(io, 100)
1772 self.i, self.o = i ^ 1, o ^ 1
1773
1774 def decode(self, input, final=False):
1775 output = ''
1776 for b in input:
1777 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001778 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001779 if self.buffer:
1780 output += self.process_word()
1781 else:
1782 self.buffer.append(b)
1783 else: # fixed-length, terminate after self.i bytes
1784 self.buffer.append(b)
1785 if len(self.buffer) == self.i:
1786 output += self.process_word()
1787 if final and self.buffer: # EOF terminates the last word
1788 output += self.process_word()
1789 return output
1790
1791 def process_word(self):
1792 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001793 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001794 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001795 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001796 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1797 else:
1798 output = self.buffer.decode('ascii')
1799 if len(output) < self.o:
1800 output += '-'*self.o # pad out with hyphens
1801 if self.o:
1802 output = output[:self.o] # truncate to output length
1803 output += '.'
1804 self.buffer = bytearray()
1805 return output
1806
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001807 codecEnabled = False
1808
1809 @classmethod
1810 def lookupTestDecoder(cls, name):
1811 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001812 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001813 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001814 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001815 incrementalencoder=None,
1816 streamreader=None, streamwriter=None,
1817 incrementaldecoder=cls)
1818
1819# Register the previous decoder for testing.
1820# Disabled by default, tests will enable it.
1821codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1822
1823
Christian Heimes1a6387e2008-03-26 12:49:49 +00001824class StatefulIncrementalDecoderTest(unittest.TestCase):
1825 """
1826 Make sure the StatefulIncrementalDecoder actually works.
1827 """
1828
1829 test_cases = [
1830 # I=1, O=1 (fixed-length input == fixed-length output)
1831 (b'abcd', False, 'a.b.c.d.'),
1832 # I=0, O=0 (variable-length input, variable-length output)
1833 (b'oiabcd', True, 'abcd.'),
1834 # I=0, O=0 (should ignore extra periods)
1835 (b'oi...abcd...', True, 'abcd.'),
1836 # I=0, O=6 (variable-length input, fixed-length output)
1837 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1838 # I=2, O=6 (fixed-length input < fixed-length output)
1839 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1840 # I=6, O=3 (fixed-length input > fixed-length output)
1841 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1842 # I=0, then 3; O=29, then 15 (with longer output)
1843 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1844 'a----------------------------.' +
1845 'b----------------------------.' +
1846 'cde--------------------------.' +
1847 'abcdefghijabcde.' +
1848 'a.b------------.' +
1849 '.c.------------.' +
1850 'd.e------------.' +
1851 'k--------------.' +
1852 'l--------------.' +
1853 'm--------------.')
1854 ]
1855
Antoine Pitrou19690592009-06-12 20:14:08 +00001856 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001857 # Try a few one-shot test cases.
1858 for input, eof, output in self.test_cases:
1859 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001860 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001861
1862 # Also test an unfinished decode, followed by forcing EOF.
1863 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001864 self.assertEqual(d.decode(b'oiabcd'), '')
1865 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001866
1867class TextIOWrapperTest(unittest.TestCase):
1868
1869 def setUp(self):
1870 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1871 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001872 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001873
1874 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001875 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001876
Antoine Pitrou19690592009-06-12 20:14:08 +00001877 def test_constructor(self):
1878 r = self.BytesIO(b"\xc3\xa9\n\n")
1879 b = self.BufferedReader(r, 1000)
1880 t = self.TextIOWrapper(b)
1881 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001882 self.assertEqual(t.encoding, "latin1")
1883 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001884 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001885 self.assertEqual(t.encoding, "utf8")
1886 self.assertEqual(t.line_buffering, True)
1887 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001888 self.assertRaises(TypeError, t.__init__, b, newline=42)
1889 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1890
1891 def test_detach(self):
1892 r = self.BytesIO()
1893 b = self.BufferedWriter(r)
1894 t = self.TextIOWrapper(b)
1895 self.assertIs(t.detach(), b)
1896
1897 t = self.TextIOWrapper(b, encoding="ascii")
1898 t.write("howdy")
1899 self.assertFalse(r.getvalue())
1900 t.detach()
1901 self.assertEqual(r.getvalue(), b"howdy")
1902 self.assertRaises(ValueError, t.detach)
1903
1904 def test_repr(self):
1905 raw = self.BytesIO("hello".encode("utf-8"))
1906 b = self.BufferedReader(raw)
1907 t = self.TextIOWrapper(b, encoding="utf-8")
1908 modname = self.TextIOWrapper.__module__
1909 self.assertEqual(repr(t),
1910 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1911 raw.name = "dummy"
1912 self.assertEqual(repr(t),
1913 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1914 raw.name = b"dummy"
1915 self.assertEqual(repr(t),
1916 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1917
1918 def test_line_buffering(self):
1919 r = self.BytesIO()
1920 b = self.BufferedWriter(r, 1000)
1921 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1922 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00001923 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001924 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00001925 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001926 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00001927 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001928
Antoine Pitrou19690592009-06-12 20:14:08 +00001929 def test_encoding(self):
1930 # Check the encoding attribute is always set, and valid
1931 b = self.BytesIO()
1932 t = self.TextIOWrapper(b, encoding="utf8")
1933 self.assertEqual(t.encoding, "utf8")
1934 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001935 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001936 codecs.lookup(t.encoding)
1937
1938 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001939 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001940 b = self.BytesIO(b"abc\n\xff\n")
1941 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001942 self.assertRaises(UnicodeError, t.read)
1943 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001944 b = self.BytesIO(b"abc\n\xff\n")
1945 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001946 self.assertRaises(UnicodeError, t.read)
1947 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001948 b = self.BytesIO(b"abc\n\xff\n")
1949 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00001950 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001951 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001952 b = self.BytesIO(b"abc\n\xff\n")
1953 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00001954 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001955
Antoine Pitrou19690592009-06-12 20:14:08 +00001956 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001957 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001958 b = self.BytesIO()
1959 t = self.TextIOWrapper(b, encoding="ascii")
1960 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001961 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001962 b = self.BytesIO()
1963 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1964 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001965 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001966 b = self.BytesIO()
1967 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001968 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001969 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001970 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001971 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001972 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001973 b = self.BytesIO()
1974 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001975 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001976 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001977 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001978 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001979
Antoine Pitrou19690592009-06-12 20:14:08 +00001980 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001981 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1982
1983 tests = [
1984 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1985 [ '', input_lines ],
1986 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1987 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1988 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1989 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001990 encodings = (
1991 'utf-8', 'latin-1',
1992 'utf-16', 'utf-16-le', 'utf-16-be',
1993 'utf-32', 'utf-32-le', 'utf-32-be',
1994 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001995
1996 # Try a range of buffer sizes to test the case where \r is the last
1997 # character in TextIOWrapper._pending_line.
1998 for encoding in encodings:
1999 # XXX: str.encode() should return bytes
2000 data = bytes(''.join(input_lines).encode(encoding))
2001 for do_reads in (False, True):
2002 for bufsize in range(1, 10):
2003 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002004 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2005 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00002006 encoding=encoding)
2007 if do_reads:
2008 got_lines = []
2009 while True:
2010 c2 = textio.read(2)
2011 if c2 == '':
2012 break
Ezio Melotti2623a372010-11-21 13:34:58 +00002013 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002014 got_lines.append(c2 + textio.readline())
2015 else:
2016 got_lines = list(textio)
2017
2018 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00002019 self.assertEqual(got_line, exp_line)
2020 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002021
Antoine Pitrou19690592009-06-12 20:14:08 +00002022 def test_newlines_input(self):
2023 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002024 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2025 for newline, expected in [
2026 (None, normalized.decode("ascii").splitlines(True)),
2027 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00002028 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2029 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2030 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00002031 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00002032 buf = self.BytesIO(testdata)
2033 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00002034 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002035 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002036 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002037
Antoine Pitrou19690592009-06-12 20:14:08 +00002038 def test_newlines_output(self):
2039 testdict = {
2040 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2041 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2042 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2043 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2044 }
2045 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2046 for newline, expected in tests:
2047 buf = self.BytesIO()
2048 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2049 txt.write("AAA\nB")
2050 txt.write("BB\nCCC\n")
2051 txt.write("X\rY\r\nZ")
2052 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002053 self.assertEqual(buf.closed, False)
2054 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00002055
2056 def test_destructor(self):
2057 l = []
2058 base = self.BytesIO
2059 class MyBytesIO(base):
2060 def close(self):
2061 l.append(self.getvalue())
2062 base.close(self)
2063 b = MyBytesIO()
2064 t = self.TextIOWrapper(b, encoding="ascii")
2065 t.write("abc")
2066 del t
2067 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00002068 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00002069
2070 def test_override_destructor(self):
2071 record = []
2072 class MyTextIO(self.TextIOWrapper):
2073 def __del__(self):
2074 record.append(1)
2075 try:
2076 f = super(MyTextIO, self).__del__
2077 except AttributeError:
2078 pass
2079 else:
2080 f()
2081 def close(self):
2082 record.append(2)
2083 super(MyTextIO, self).close()
2084 def flush(self):
2085 record.append(3)
2086 super(MyTextIO, self).flush()
2087 b = self.BytesIO()
2088 t = MyTextIO(b, encoding="ascii")
2089 del t
2090 support.gc_collect()
2091 self.assertEqual(record, [1, 2, 3])
2092
2093 def test_error_through_destructor(self):
2094 # Test that the exception state is not modified by a destructor,
2095 # even if close() fails.
2096 rawio = self.CloseFailureIO()
2097 def f():
2098 self.TextIOWrapper(rawio).xyzzy
2099 with support.captured_output("stderr") as s:
2100 self.assertRaises(AttributeError, f)
2101 s = s.getvalue().strip()
2102 if s:
2103 # The destructor *may* have printed an unraisable error, check it
2104 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002105 self.assertTrue(s.startswith("Exception IOError: "), s)
2106 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002107
2108 # Systematic tests of the text I/O API
2109
Antoine Pitrou19690592009-06-12 20:14:08 +00002110 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002111 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2112 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00002113 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002114 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002115 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002116 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002117 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002118 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002119 self.assertEqual(f.tell(), 0)
2120 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002121 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002122 self.assertEqual(f.seek(0), 0)
2123 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002124 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002125 self.assertEqual(f.read(2), "ab")
2126 self.assertEqual(f.read(1), "c")
2127 self.assertEqual(f.read(1), "")
2128 self.assertEqual(f.read(), "")
2129 self.assertEqual(f.tell(), cookie)
2130 self.assertEqual(f.seek(0), 0)
2131 self.assertEqual(f.seek(0, 2), cookie)
2132 self.assertEqual(f.write("def"), 3)
2133 self.assertEqual(f.seek(cookie), cookie)
2134 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002135 if enc.startswith("utf"):
2136 self.multi_line_test(f, enc)
2137 f.close()
2138
2139 def multi_line_test(self, f, enc):
2140 f.seek(0)
2141 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002142 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002143 wlines = []
2144 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2145 chars = []
2146 for i in range(size):
2147 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002148 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002149 wlines.append((f.tell(), line))
2150 f.write(line)
2151 f.seek(0)
2152 rlines = []
2153 while True:
2154 pos = f.tell()
2155 line = f.readline()
2156 if not line:
2157 break
2158 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002159 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002160
Antoine Pitrou19690592009-06-12 20:14:08 +00002161 def test_telling(self):
2162 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002163 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002164 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002165 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002166 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002167 p2 = f.tell()
2168 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002169 self.assertEqual(f.tell(), p0)
2170 self.assertEqual(f.readline(), "\xff\n")
2171 self.assertEqual(f.tell(), p1)
2172 self.assertEqual(f.readline(), "\xff\n")
2173 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002174 f.seek(0)
2175 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002176 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002177 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002178 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002179 f.close()
2180
Antoine Pitrou19690592009-06-12 20:14:08 +00002181 def test_seeking(self):
2182 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002183 prefix_size = chunk_size - 2
2184 u_prefix = "a" * prefix_size
2185 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002186 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002187 u_suffix = "\u8888\n"
2188 suffix = bytes(u_suffix.encode("utf-8"))
2189 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002190 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002191 f.write(line*2)
2192 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002193 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002194 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002195 self.assertEqual(s, prefix.decode("ascii"))
2196 self.assertEqual(f.tell(), prefix_size)
2197 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002198
Antoine Pitrou19690592009-06-12 20:14:08 +00002199 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002200 # Regression test for a specific bug
2201 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002202 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002203 f.write(data)
2204 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002205 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002206 f._CHUNK_SIZE # Just test that it exists
2207 f._CHUNK_SIZE = 2
2208 f.readline()
2209 f.tell()
2210
Antoine Pitrou19690592009-06-12 20:14:08 +00002211 def test_seek_and_tell(self):
2212 #Test seek/tell using the StatefulIncrementalDecoder.
2213 # Make test faster by doing smaller seeks
2214 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002215
Antoine Pitrou19690592009-06-12 20:14:08 +00002216 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002217 """Tell/seek to various points within a data stream and ensure
2218 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002219 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002220 f.write(data)
2221 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002222 f = self.open(support.TESTFN, encoding='test_decoder')
2223 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002224 decoded = f.read()
2225 f.close()
2226
2227 for i in range(min_pos, len(decoded) + 1): # seek positions
2228 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002229 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002230 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002231 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002232 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002233 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002234 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002235 f.close()
2236
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002237 # Enable the test decoder.
2238 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002239
2240 # Run the tests.
2241 try:
2242 # Try each test case.
2243 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002244 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002245
2246 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002247 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2248 offset = CHUNK_SIZE - len(input)//2
2249 prefix = b'.'*offset
2250 # Don't bother seeking into the prefix (takes too long).
2251 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002252 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002253
2254 # Ensure our test decoder won't interfere with subsequent tests.
2255 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002256 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002257
Antoine Pitrou19690592009-06-12 20:14:08 +00002258 def test_encoded_writes(self):
2259 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002260 tests = ("utf-16",
2261 "utf-16-le",
2262 "utf-16-be",
2263 "utf-32",
2264 "utf-32-le",
2265 "utf-32-be")
2266 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002267 buf = self.BytesIO()
2268 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002269 # Check if the BOM is written only once (see issue1753).
2270 f.write(data)
2271 f.write(data)
2272 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002273 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002274 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002275 self.assertEqual(f.read(), data * 2)
2276 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002277
Antoine Pitrou19690592009-06-12 20:14:08 +00002278 def test_unreadable(self):
2279 class UnReadable(self.BytesIO):
2280 def readable(self):
2281 return False
2282 txt = self.TextIOWrapper(UnReadable())
2283 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002284
Antoine Pitrou19690592009-06-12 20:14:08 +00002285 def test_read_one_by_one(self):
2286 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002287 reads = ""
2288 while True:
2289 c = txt.read(1)
2290 if not c:
2291 break
2292 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002293 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002294
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002295 def test_readlines(self):
2296 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2297 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2298 txt.seek(0)
2299 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2300 txt.seek(0)
2301 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2302
Christian Heimes1a6387e2008-03-26 12:49:49 +00002303 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002304 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002305 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002306 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002307 reads = ""
2308 while True:
2309 c = txt.read(128)
2310 if not c:
2311 break
2312 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002313 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002314
Antoine Pitroueadca1d2012-10-16 23:02:27 +02002315 def test_writelines(self):
2316 l = ['ab', 'cd', 'ef']
2317 buf = self.BytesIO()
2318 txt = self.TextIOWrapper(buf)
2319 txt.writelines(l)
2320 txt.flush()
2321 self.assertEqual(buf.getvalue(), b'abcdef')
2322
2323 def test_writelines_userlist(self):
2324 l = UserList(['ab', 'cd', 'ef'])
2325 buf = self.BytesIO()
2326 txt = self.TextIOWrapper(buf)
2327 txt.writelines(l)
2328 txt.flush()
2329 self.assertEqual(buf.getvalue(), b'abcdef')
2330
2331 def test_writelines_error(self):
2332 txt = self.TextIOWrapper(self.BytesIO())
2333 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2334 self.assertRaises(TypeError, txt.writelines, None)
2335 self.assertRaises(TypeError, txt.writelines, b'abc')
2336
Christian Heimes1a6387e2008-03-26 12:49:49 +00002337 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002338 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002339
2340 # read one char at a time
2341 reads = ""
2342 while True:
2343 c = txt.read(1)
2344 if not c:
2345 break
2346 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002347 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002348
2349 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002350 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002351 txt._CHUNK_SIZE = 4
2352
2353 reads = ""
2354 while True:
2355 c = txt.read(4)
2356 if not c:
2357 break
2358 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002359 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002360
2361 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002362 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002363 txt._CHUNK_SIZE = 4
2364
2365 reads = txt.read(4)
2366 reads += txt.read(4)
2367 reads += txt.readline()
2368 reads += txt.readline()
2369 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002370 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002371
2372 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002373 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002374 txt._CHUNK_SIZE = 4
2375
2376 reads = txt.read(4)
2377 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002378 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002379
2380 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002381 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002382 txt._CHUNK_SIZE = 4
2383
2384 reads = txt.read(4)
2385 pos = txt.tell()
2386 txt.seek(0)
2387 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002388 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002389
2390 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002391 buffer = self.BytesIO(self.testdata)
2392 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002393
2394 self.assertEqual(buffer.seekable(), txt.seekable())
2395
Antoine Pitrou19690592009-06-12 20:14:08 +00002396 def test_append_bom(self):
2397 # The BOM is not written again when appending to a non-empty file
2398 filename = support.TESTFN
2399 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2400 with self.open(filename, 'w', encoding=charset) as f:
2401 f.write('aaa')
2402 pos = f.tell()
2403 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002404 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002405
2406 with self.open(filename, 'a', encoding=charset) as f:
2407 f.write('xxx')
2408 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002409 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002410
Antoine Pitrou19690592009-06-12 20:14:08 +00002411 def test_seek_bom(self):
2412 # Same test, but when seeking manually
2413 filename = support.TESTFN
2414 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2415 with self.open(filename, 'w', encoding=charset) as f:
2416 f.write('aaa')
2417 pos = f.tell()
2418 with self.open(filename, 'r+', encoding=charset) as f:
2419 f.seek(pos)
2420 f.write('zzz')
2421 f.seek(0)
2422 f.write('bbb')
2423 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002424 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002425
2426 def test_errors_property(self):
2427 with self.open(support.TESTFN, "w") as f:
2428 self.assertEqual(f.errors, "strict")
2429 with self.open(support.TESTFN, "w", errors="replace") as f:
2430 self.assertEqual(f.errors, "replace")
2431
Victor Stinner6a102812010-04-27 23:55:59 +00002432 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002433 def test_threads_write(self):
2434 # Issue6750: concurrent writes could duplicate data
2435 event = threading.Event()
2436 with self.open(support.TESTFN, "w", buffering=1) as f:
2437 def run(n):
2438 text = "Thread%03d\n" % n
2439 event.wait()
2440 f.write(text)
2441 threads = [threading.Thread(target=lambda n=x: run(n))
2442 for x in range(20)]
2443 for t in threads:
2444 t.start()
2445 time.sleep(0.02)
2446 event.set()
2447 for t in threads:
2448 t.join()
2449 with self.open(support.TESTFN) as f:
2450 content = f.read()
2451 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002452 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002453
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002454 def test_flush_error_on_close(self):
2455 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2456 def bad_flush():
2457 raise IOError()
2458 txt.flush = bad_flush
2459 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -06002460 self.assertTrue(txt.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002461
2462 def test_multi_close(self):
2463 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2464 txt.close()
2465 txt.close()
2466 txt.close()
2467 self.assertRaises(ValueError, txt.flush)
2468
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002469 def test_readonly_attributes(self):
2470 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2471 buf = self.BytesIO(self.testdata)
2472 with self.assertRaises((AttributeError, TypeError)):
2473 txt.buffer = buf
2474
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002475 def test_read_nonbytes(self):
2476 # Issue #17106
2477 # Crash when underlying read() returns non-bytes
2478 class NonbytesStream(self.StringIO):
2479 read1 = self.StringIO.read
2480 class NonbytesStream(self.StringIO):
2481 read1 = self.StringIO.read
2482 t = self.TextIOWrapper(NonbytesStream('a'))
2483 with self.maybeRaises(TypeError):
2484 t.read(1)
2485 t = self.TextIOWrapper(NonbytesStream('a'))
2486 with self.maybeRaises(TypeError):
2487 t.readline()
2488 t = self.TextIOWrapper(NonbytesStream('a'))
2489 self.assertEqual(t.read(), u'a')
2490
2491 def test_illegal_decoder(self):
2492 # Issue #17106
2493 # Crash when decoder returns non-string
2494 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2495 encoding='quopri_codec')
2496 with self.maybeRaises(TypeError):
2497 t.read(1)
2498 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2499 encoding='quopri_codec')
2500 with self.maybeRaises(TypeError):
2501 t.readline()
2502 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2503 encoding='quopri_codec')
2504 with self.maybeRaises(TypeError):
2505 t.read()
2506
2507
Antoine Pitrou19690592009-06-12 20:14:08 +00002508class CTextIOWrapperTest(TextIOWrapperTest):
2509
2510 def test_initialization(self):
2511 r = self.BytesIO(b"\xc3\xa9\n\n")
2512 b = self.BufferedReader(r, 1000)
2513 t = self.TextIOWrapper(b)
2514 self.assertRaises(TypeError, t.__init__, b, newline=42)
2515 self.assertRaises(ValueError, t.read)
2516 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2517 self.assertRaises(ValueError, t.read)
2518
2519 def test_garbage_collection(self):
2520 # C TextIOWrapper objects are collected, and collecting them flushes
2521 # all data to disk.
2522 # The Python version has __del__, so it ends in gc.garbage instead.
2523 rawio = io.FileIO(support.TESTFN, "wb")
2524 b = self.BufferedWriter(rawio)
2525 t = self.TextIOWrapper(b, encoding="ascii")
2526 t.write("456def")
2527 t.x = t
2528 wr = weakref.ref(t)
2529 del t
2530 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002531 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002532 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002533 self.assertEqual(f.read(), b"456def")
2534
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002535 def test_rwpair_cleared_before_textio(self):
2536 # Issue 13070: TextIOWrapper's finalization would crash when called
2537 # after the reference to the underlying BufferedRWPair's writer got
2538 # cleared by the GC.
2539 for i in range(1000):
2540 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2541 t1 = self.TextIOWrapper(b1, encoding="ascii")
2542 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2543 t2 = self.TextIOWrapper(b2, encoding="ascii")
2544 # circular references
2545 t1.buddy = t2
2546 t2.buddy = t1
2547 support.gc_collect()
2548
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002549 maybeRaises = unittest.TestCase.assertRaises
2550
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002551
Antoine Pitrou19690592009-06-12 20:14:08 +00002552class PyTextIOWrapperTest(TextIOWrapperTest):
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002553 @contextlib.contextmanager
2554 def maybeRaises(self, *args, **kwds):
2555 yield
Antoine Pitrou19690592009-06-12 20:14:08 +00002556
2557
2558class IncrementalNewlineDecoderTest(unittest.TestCase):
2559
2560 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002561 # UTF-8 specific tests for a newline decoder
2562 def _check_decode(b, s, **kwargs):
2563 # We exercise getstate() / setstate() as well as decode()
2564 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002565 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002566 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002567 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002568
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002569 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002570
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002571 _check_decode(b'\xe8', "")
2572 _check_decode(b'\xa2', "")
2573 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002574
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002575 _check_decode(b'\xe8', "")
2576 _check_decode(b'\xa2', "")
2577 _check_decode(b'\x88', "\u8888")
2578
2579 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002580 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2581
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002582 decoder.reset()
2583 _check_decode(b'\n', "\n")
2584 _check_decode(b'\r', "")
2585 _check_decode(b'', "\n", final=True)
2586 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002587
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002588 _check_decode(b'\r', "")
2589 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002590
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002591 _check_decode(b'\r\r\n', "\n\n")
2592 _check_decode(b'\r', "")
2593 _check_decode(b'\r', "\n")
2594 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002595
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002596 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2597 _check_decode(b'\xe8\xa2\x88', "\u8888")
2598 _check_decode(b'\n', "\n")
2599 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2600 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002601
Antoine Pitrou19690592009-06-12 20:14:08 +00002602 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002603 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002604 if encoding is not None:
2605 encoder = codecs.getincrementalencoder(encoding)()
2606 def _decode_bytewise(s):
2607 # Decode one byte at a time
2608 for b in encoder.encode(s):
2609 result.append(decoder.decode(b))
2610 else:
2611 encoder = None
2612 def _decode_bytewise(s):
2613 # Decode one char at a time
2614 for c in s:
2615 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002616 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002617 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002618 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002619 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002620 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002621 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002622 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002623 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002624 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002625 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002626 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002627 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002628 input = "abc"
2629 if encoder is not None:
2630 encoder.reset()
2631 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002632 self.assertEqual(decoder.decode(input), "abc")
2633 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002634
2635 def test_newline_decoder(self):
2636 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002637 # None meaning the IncrementalNewlineDecoder takes unicode input
2638 # rather than bytes input
2639 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002640 'utf-16', 'utf-16-le', 'utf-16-be',
2641 'utf-32', 'utf-32-le', 'utf-32-be',
2642 )
2643 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002644 decoder = enc and codecs.getincrementaldecoder(enc)()
2645 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2646 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002647 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002648 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2649 self.check_newline_decoding_utf8(decoder)
2650
2651 def test_newline_bytes(self):
2652 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2653 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002654 self.assertEqual(dec.newlines, None)
2655 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2656 self.assertEqual(dec.newlines, None)
2657 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2658 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002659 dec = self.IncrementalNewlineDecoder(None, translate=False)
2660 _check(dec)
2661 dec = self.IncrementalNewlineDecoder(None, translate=True)
2662 _check(dec)
2663
2664class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2665 pass
2666
2667class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2668 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002669
Christian Heimes1a6387e2008-03-26 12:49:49 +00002670
2671# XXX Tests for open()
2672
2673class MiscIOTest(unittest.TestCase):
2674
Benjamin Petersonad100c32008-11-20 22:06:22 +00002675 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002676 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002677
Antoine Pitrou19690592009-06-12 20:14:08 +00002678 def test___all__(self):
2679 for name in self.io.__all__:
2680 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002681 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002682 if name == "open":
2683 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002684 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002685 self.assertTrue(issubclass(obj, Exception), name)
2686 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002687 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002688
Benjamin Petersonad100c32008-11-20 22:06:22 +00002689 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002690 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002691 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002692 f.close()
2693
Antoine Pitrou19690592009-06-12 20:14:08 +00002694 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002695 self.assertEqual(f.name, support.TESTFN)
2696 self.assertEqual(f.buffer.name, support.TESTFN)
2697 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2698 self.assertEqual(f.mode, "U")
2699 self.assertEqual(f.buffer.mode, "rb")
2700 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002701 f.close()
2702
Antoine Pitrou19690592009-06-12 20:14:08 +00002703 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002704 self.assertEqual(f.mode, "w+")
2705 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2706 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002707
Antoine Pitrou19690592009-06-12 20:14:08 +00002708 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002709 self.assertEqual(g.mode, "wb")
2710 self.assertEqual(g.raw.mode, "wb")
2711 self.assertEqual(g.name, f.fileno())
2712 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002713 f.close()
2714 g.close()
2715
Antoine Pitrou19690592009-06-12 20:14:08 +00002716 def test_io_after_close(self):
2717 for kwargs in [
2718 {"mode": "w"},
2719 {"mode": "wb"},
2720 {"mode": "w", "buffering": 1},
2721 {"mode": "w", "buffering": 2},
2722 {"mode": "wb", "buffering": 0},
2723 {"mode": "r"},
2724 {"mode": "rb"},
2725 {"mode": "r", "buffering": 1},
2726 {"mode": "r", "buffering": 2},
2727 {"mode": "rb", "buffering": 0},
2728 {"mode": "w+"},
2729 {"mode": "w+b"},
2730 {"mode": "w+", "buffering": 1},
2731 {"mode": "w+", "buffering": 2},
2732 {"mode": "w+b", "buffering": 0},
2733 ]:
2734 f = self.open(support.TESTFN, **kwargs)
2735 f.close()
2736 self.assertRaises(ValueError, f.flush)
2737 self.assertRaises(ValueError, f.fileno)
2738 self.assertRaises(ValueError, f.isatty)
2739 self.assertRaises(ValueError, f.__iter__)
2740 if hasattr(f, "peek"):
2741 self.assertRaises(ValueError, f.peek, 1)
2742 self.assertRaises(ValueError, f.read)
2743 if hasattr(f, "read1"):
2744 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002745 if hasattr(f, "readall"):
2746 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002747 if hasattr(f, "readinto"):
2748 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2749 self.assertRaises(ValueError, f.readline)
2750 self.assertRaises(ValueError, f.readlines)
2751 self.assertRaises(ValueError, f.seek, 0)
2752 self.assertRaises(ValueError, f.tell)
2753 self.assertRaises(ValueError, f.truncate)
2754 self.assertRaises(ValueError, f.write,
2755 b"" if "b" in kwargs['mode'] else "")
2756 self.assertRaises(ValueError, f.writelines, [])
2757 self.assertRaises(ValueError, next, f)
2758
2759 def test_blockingioerror(self):
2760 # Various BlockingIOError issues
2761 self.assertRaises(TypeError, self.BlockingIOError)
2762 self.assertRaises(TypeError, self.BlockingIOError, 1)
2763 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2764 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2765 b = self.BlockingIOError(1, "")
2766 self.assertEqual(b.characters_written, 0)
2767 class C(unicode):
2768 pass
2769 c = C("")
2770 b = self.BlockingIOError(1, c)
2771 c.b = b
2772 b.c = c
2773 wr = weakref.ref(c)
2774 del c, b
2775 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002776 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002777
2778 def test_abcs(self):
2779 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002780 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2781 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2782 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2783 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002784
2785 def _check_abc_inheritance(self, abcmodule):
2786 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002787 self.assertIsInstance(f, abcmodule.IOBase)
2788 self.assertIsInstance(f, abcmodule.RawIOBase)
2789 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2790 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002791 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002792 self.assertIsInstance(f, abcmodule.IOBase)
2793 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2794 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2795 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002796 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002797 self.assertIsInstance(f, abcmodule.IOBase)
2798 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2799 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2800 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002801
2802 def test_abc_inheritance(self):
2803 # Test implementations inherit from their respective ABCs
2804 self._check_abc_inheritance(self)
2805
2806 def test_abc_inheritance_official(self):
2807 # Test implementations inherit from the official ABCs of the
2808 # baseline "io" module.
2809 self._check_abc_inheritance(io)
2810
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01002811 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2812 def test_nonblock_pipe_write_bigbuf(self):
2813 self._test_nonblock_pipe_write(16*1024)
2814
2815 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2816 def test_nonblock_pipe_write_smallbuf(self):
2817 self._test_nonblock_pipe_write(1024)
2818
2819 def _set_non_blocking(self, fd):
2820 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2821 self.assertNotEqual(flags, -1)
2822 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2823 self.assertEqual(res, 0)
2824
2825 def _test_nonblock_pipe_write(self, bufsize):
2826 sent = []
2827 received = []
2828 r, w = os.pipe()
2829 self._set_non_blocking(r)
2830 self._set_non_blocking(w)
2831
2832 # To exercise all code paths in the C implementation we need
2833 # to play with buffer sizes. For instance, if we choose a
2834 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2835 # then we will never get a partial write of the buffer.
2836 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2837 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2838
2839 with rf, wf:
2840 for N in 9999, 73, 7574:
2841 try:
2842 i = 0
2843 while True:
2844 msg = bytes([i % 26 + 97]) * N
2845 sent.append(msg)
2846 wf.write(msg)
2847 i += 1
2848
2849 except self.BlockingIOError as e:
2850 self.assertEqual(e.args[0], errno.EAGAIN)
2851 sent[-1] = sent[-1][:e.characters_written]
2852 received.append(rf.read())
2853 msg = b'BLOCKED'
2854 wf.write(msg)
2855 sent.append(msg)
2856
2857 while True:
2858 try:
2859 wf.flush()
2860 break
2861 except self.BlockingIOError as e:
2862 self.assertEqual(e.args[0], errno.EAGAIN)
2863 self.assertEqual(e.characters_written, 0)
2864 received.append(rf.read())
2865
2866 received += iter(rf.read, None)
2867
2868 sent, received = b''.join(sent), b''.join(received)
2869 self.assertTrue(sent == received)
2870 self.assertTrue(wf.closed)
2871 self.assertTrue(rf.closed)
2872
Antoine Pitrou19690592009-06-12 20:14:08 +00002873class CMiscIOTest(MiscIOTest):
2874 io = io
2875
2876class PyMiscIOTest(MiscIOTest):
2877 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002878
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002879
2880@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2881class SignalsTest(unittest.TestCase):
2882
2883 def setUp(self):
2884 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2885
2886 def tearDown(self):
2887 signal.signal(signal.SIGALRM, self.oldalrm)
2888
2889 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00002890 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002891
2892 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02002893 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2894 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002895 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2896 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00002897 invokes the signal handler, and bubbles up the exception raised
2898 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002899 read_results = []
2900 def _read():
2901 s = os.read(r, 1)
2902 read_results.append(s)
2903 t = threading.Thread(target=_read)
2904 t.daemon = True
2905 r, w = os.pipe()
2906 try:
2907 wio = self.io.open(w, **fdopen_kwargs)
2908 t.start()
2909 signal.alarm(1)
2910 # Fill the pipe enough that the write will be blocking.
2911 # It will be interrupted by the timer armed above. Since the
2912 # other thread has read one byte, the low-level write will
2913 # return with a successful (partial) result rather than an EINTR.
2914 # The buffered IO layer must check for pending signal
2915 # handlers, which in this case will invoke alarm_interrupt().
2916 self.assertRaises(ZeroDivisionError,
Antoine Pitrou68915d72013-04-24 23:31:38 +02002917 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002918 t.join()
2919 # We got one byte, get another one and check that it isn't a
2920 # repeat of the first one.
2921 read_results.append(os.read(r, 1))
2922 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2923 finally:
2924 os.close(w)
2925 os.close(r)
2926 # This is deliberate. If we didn't close the file descriptor
2927 # before closing wio, wio would try to flush its internal
2928 # buffer, and block again.
2929 try:
2930 wio.close()
2931 except IOError as e:
2932 if e.errno != errno.EBADF:
2933 raise
2934
2935 def test_interrupted_write_unbuffered(self):
2936 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2937
2938 def test_interrupted_write_buffered(self):
2939 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2940
2941 def test_interrupted_write_text(self):
2942 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2943
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002944 def check_reentrant_write(self, data, **fdopen_kwargs):
2945 def on_alarm(*args):
2946 # Will be called reentrantly from the same thread
2947 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02002948 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002949 signal.signal(signal.SIGALRM, on_alarm)
2950 r, w = os.pipe()
2951 wio = self.io.open(w, **fdopen_kwargs)
2952 try:
2953 signal.alarm(1)
2954 # Either the reentrant call to wio.write() fails with RuntimeError,
2955 # or the signal handler raises ZeroDivisionError.
2956 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2957 while 1:
2958 for i in range(100):
2959 wio.write(data)
2960 wio.flush()
2961 # Make sure the buffer doesn't fill up and block further writes
2962 os.read(r, len(data) * 100)
2963 exc = cm.exception
2964 if isinstance(exc, RuntimeError):
2965 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2966 finally:
2967 wio.close()
2968 os.close(r)
2969
2970 def test_reentrant_write_buffered(self):
2971 self.check_reentrant_write(b"xy", mode="wb")
2972
2973 def test_reentrant_write_text(self):
2974 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2975
Antoine Pitrou6439c002011-02-25 21:35:47 +00002976 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2977 """Check that a buffered read, when it gets interrupted (either
2978 returning a partial result or EINTR), properly invokes the signal
2979 handler and retries if the latter returned successfully."""
2980 r, w = os.pipe()
2981 fdopen_kwargs["closefd"] = False
2982 def alarm_handler(sig, frame):
2983 os.write(w, b"bar")
2984 signal.signal(signal.SIGALRM, alarm_handler)
2985 try:
2986 rio = self.io.open(r, **fdopen_kwargs)
2987 os.write(w, b"foo")
2988 signal.alarm(1)
2989 # Expected behaviour:
2990 # - first raw read() returns partial b"foo"
2991 # - second raw read() returns EINTR
2992 # - third raw read() returns b"bar"
2993 self.assertEqual(decode(rio.read(6)), "foobar")
2994 finally:
2995 rio.close()
2996 os.close(w)
2997 os.close(r)
2998
2999 def test_interrupterd_read_retry_buffered(self):
3000 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3001 mode="rb")
3002
3003 def test_interrupterd_read_retry_text(self):
3004 self.check_interrupted_read_retry(lambda x: x,
3005 mode="r")
3006
3007 @unittest.skipUnless(threading, 'Threading required for this test.')
3008 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3009 """Check that a buffered write, when it gets interrupted (either
3010 returning a partial result or EINTR), properly invokes the signal
3011 handler and retries if the latter returned successfully."""
3012 select = support.import_module("select")
3013 # A quantity that exceeds the buffer size of an anonymous pipe's
3014 # write end.
Antoine Pitrou68915d72013-04-24 23:31:38 +02003015 N = support.PIPE_MAX_SIZE
Antoine Pitrou6439c002011-02-25 21:35:47 +00003016 r, w = os.pipe()
3017 fdopen_kwargs["closefd"] = False
3018 # We need a separate thread to read from the pipe and allow the
3019 # write() to finish. This thread is started after the SIGALRM is
3020 # received (forcing a first EINTR in write()).
3021 read_results = []
3022 write_finished = False
3023 def _read():
3024 while not write_finished:
3025 while r in select.select([r], [], [], 1.0)[0]:
3026 s = os.read(r, 1024)
3027 read_results.append(s)
3028 t = threading.Thread(target=_read)
3029 t.daemon = True
3030 def alarm1(sig, frame):
3031 signal.signal(signal.SIGALRM, alarm2)
3032 signal.alarm(1)
3033 def alarm2(sig, frame):
3034 t.start()
3035 signal.signal(signal.SIGALRM, alarm1)
3036 try:
3037 wio = self.io.open(w, **fdopen_kwargs)
3038 signal.alarm(1)
3039 # Expected behaviour:
3040 # - first raw write() is partial (because of the limited pipe buffer
3041 # and the first alarm)
3042 # - second raw write() returns EINTR (because of the second alarm)
3043 # - subsequent write()s are successful (either partial or complete)
3044 self.assertEqual(N, wio.write(item * N))
3045 wio.flush()
3046 write_finished = True
3047 t.join()
3048 self.assertEqual(N, sum(len(x) for x in read_results))
3049 finally:
3050 write_finished = True
3051 os.close(w)
3052 os.close(r)
3053 # This is deliberate. If we didn't close the file descriptor
3054 # before closing wio, wio would try to flush its internal
3055 # buffer, and could block (in case of failure).
3056 try:
3057 wio.close()
3058 except IOError as e:
3059 if e.errno != errno.EBADF:
3060 raise
3061
3062 def test_interrupterd_write_retry_buffered(self):
3063 self.check_interrupted_write_retry(b"x", mode="wb")
3064
3065 def test_interrupterd_write_retry_text(self):
3066 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3067
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003068
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003069class CSignalsTest(SignalsTest):
3070 io = io
3071
3072class PySignalsTest(SignalsTest):
3073 io = pyio
3074
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003075 # Handling reentrancy issues would slow down _pyio even more, so the
3076 # tests are disabled.
3077 test_reentrant_write_buffered = None
3078 test_reentrant_write_text = None
3079
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003080
Christian Heimes1a6387e2008-03-26 12:49:49 +00003081def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00003082 tests = (CIOTest, PyIOTest,
3083 CBufferedReaderTest, PyBufferedReaderTest,
3084 CBufferedWriterTest, PyBufferedWriterTest,
3085 CBufferedRWPairTest, PyBufferedRWPairTest,
3086 CBufferedRandomTest, PyBufferedRandomTest,
3087 StatefulIncrementalDecoderTest,
3088 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3089 CTextIOWrapperTest, PyTextIOWrapperTest,
3090 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003091 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00003092 )
3093
3094 # Put the namespaces of the IO module we are testing and some useful mock
3095 # classes in the __dict__ of each test.
3096 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00003097 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00003098 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3099 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3100 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3101 globs = globals()
3102 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3103 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3104 # Avoid turning open into a bound method.
3105 py_io_ns["open"] = pyio.OpenWrapper
3106 for test in tests:
3107 if test.__name__.startswith("C"):
3108 for name, obj in c_io_ns.items():
3109 setattr(test, name, obj)
3110 elif test.__name__.startswith("Py"):
3111 for name, obj in py_io_ns.items():
3112 setattr(test, name, obj)
3113
3114 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00003115
3116if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00003117 test_main()