blob: 3751e6f9316ebed18b4f13ac3e500bfae417407f [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import threading
30import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000031import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000032import warnings
33import weakref
34import gc
35import abc
36from itertools import chain, cycle, count
37from collections import deque
38from test import test_support as support
Christian Heimes1a6387e2008-03-26 12:49:49 +000039
40import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
43
44__metaclass__ = type
45bytes = support.py3k_bytes
46
47def _default_chunk_size():
48 """Get the default TextIOWrapper chunk size"""
49 with io.open(__file__, "r", encoding="latin1") as f:
50 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000051
52
Antoine Pitrou19690592009-06-12 20:14:08 +000053class MockRawIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +000054
55 def __init__(self, read_stack=()):
56 self._read_stack = list(read_stack)
57 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000058 self._reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000059
60 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +000061 self._reads += 1
Christian Heimes1a6387e2008-03-26 12:49:49 +000062 try:
63 return self._read_stack.pop(0)
64 except:
65 return b""
66
67 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000068 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000069 return len(b)
70
71 def writable(self):
72 return True
73
74 def fileno(self):
75 return 42
76
77 def readable(self):
78 return True
79
80 def seekable(self):
81 return True
82
83 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000084 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000085
86 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000087 return 0 # same comment as above
88
89 def readinto(self, buf):
90 self._reads += 1
91 max_len = len(buf)
92 try:
93 data = self._read_stack[0]
94 except IndexError:
95 return 0
96 if data is None:
97 del self._read_stack[0]
98 return None
99 n = len(data)
100 if len(data) <= max_len:
101 del self._read_stack[0]
102 buf[:n] = data
103 return n
104 else:
105 buf[:] = data[:max_len]
106 self._read_stack[0] = data[max_len:]
107 return max_len
108
109 def truncate(self, pos=None):
110 return pos
111
112class CMockRawIO(MockRawIO, io.RawIOBase):
113 pass
114
115class PyMockRawIO(MockRawIO, pyio.RawIOBase):
116 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000117
118
Antoine Pitrou19690592009-06-12 20:14:08 +0000119class MisbehavedRawIO(MockRawIO):
120 def write(self, b):
121 return MockRawIO.write(self, b) * 2
122
123 def read(self, n=None):
124 return MockRawIO.read(self, n) * 2
125
126 def seek(self, pos, whence):
127 return -123
128
129 def tell(self):
130 return -456
131
132 def readinto(self, buf):
133 MockRawIO.readinto(self, buf)
134 return len(buf) * 5
135
136class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
137 pass
138
139class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
140 pass
141
142
143class CloseFailureIO(MockRawIO):
144 closed = 0
145
146 def close(self):
147 if not self.closed:
148 self.closed = 1
149 raise IOError
150
151class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
152 pass
153
154class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
155 pass
156
157
158class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000159
160 def __init__(self, data):
161 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000162 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000163
164 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000165 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000166 self.read_history.append(None if res is None else len(res))
167 return res
168
Antoine Pitrou19690592009-06-12 20:14:08 +0000169 def readinto(self, b):
170 res = super(MockFileIO, self).readinto(b)
171 self.read_history.append(res)
172 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000173
Antoine Pitrou19690592009-06-12 20:14:08 +0000174class CMockFileIO(MockFileIO, io.BytesIO):
175 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000176
Antoine Pitrou19690592009-06-12 20:14:08 +0000177class PyMockFileIO(MockFileIO, pyio.BytesIO):
178 pass
179
180
181class MockNonBlockWriterIO:
182
183 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000184 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000185 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000186
Antoine Pitrou19690592009-06-12 20:14:08 +0000187 def pop_written(self):
188 s = b"".join(self._write_stack)
189 self._write_stack[:] = []
190 return s
191
192 def block_on(self, char):
193 """Block when a given char is encountered."""
194 self._blocker_char = char
195
196 def readable(self):
197 return True
198
199 def seekable(self):
200 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000201
202 def writable(self):
203 return True
204
Antoine Pitrou19690592009-06-12 20:14:08 +0000205 def write(self, b):
206 b = bytes(b)
207 n = -1
208 if self._blocker_char:
209 try:
210 n = b.index(self._blocker_char)
211 except ValueError:
212 pass
213 else:
214 self._blocker_char = None
215 self._write_stack.append(b[:n])
216 raise self.BlockingIOError(0, "test blocking", n)
217 self._write_stack.append(b)
218 return len(b)
219
220class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
221 BlockingIOError = io.BlockingIOError
222
223class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
224 BlockingIOError = pyio.BlockingIOError
225
Christian Heimes1a6387e2008-03-26 12:49:49 +0000226
227class IOTest(unittest.TestCase):
228
Antoine Pitrou19690592009-06-12 20:14:08 +0000229 def setUp(self):
230 support.unlink(support.TESTFN)
231
Christian Heimes1a6387e2008-03-26 12:49:49 +0000232 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000233 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000234
235 def write_ops(self, f):
236 self.assertEqual(f.write(b"blah."), 5)
237 self.assertEqual(f.seek(0), 0)
238 self.assertEqual(f.write(b"Hello."), 6)
239 self.assertEqual(f.tell(), 6)
240 self.assertEqual(f.seek(-1, 1), 5)
241 self.assertEqual(f.tell(), 5)
242 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
243 self.assertEqual(f.seek(0), 0)
244 self.assertEqual(f.write(b"h"), 1)
245 self.assertEqual(f.seek(-1, 2), 13)
246 self.assertEqual(f.tell(), 13)
247 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000248 self.assertEqual(f.tell(), 12)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000249 self.assertRaises(TypeError, f.seek, 0.0)
250
251 def read_ops(self, f, buffered=False):
252 data = f.read(5)
253 self.assertEqual(data, b"hello")
254 data = bytearray(data)
255 self.assertEqual(f.readinto(data), 5)
256 self.assertEqual(data, b" worl")
257 self.assertEqual(f.readinto(data), 2)
258 self.assertEqual(len(data), 5)
259 self.assertEqual(data[:2], b"d\n")
260 self.assertEqual(f.seek(0), 0)
261 self.assertEqual(f.read(20), b"hello world\n")
262 self.assertEqual(f.read(1), b"")
263 self.assertEqual(f.readinto(bytearray(b"x")), 0)
264 self.assertEqual(f.seek(-6, 2), 6)
265 self.assertEqual(f.read(5), b"world")
266 self.assertEqual(f.read(0), b"")
267 self.assertEqual(f.readinto(bytearray()), 0)
268 self.assertEqual(f.seek(-6, 1), 5)
269 self.assertEqual(f.read(5), b" worl")
270 self.assertEqual(f.tell(), 10)
271 self.assertRaises(TypeError, f.seek, 0.0)
272 if buffered:
273 f.seek(0)
274 self.assertEqual(f.read(), b"hello world\n")
275 f.seek(6)
276 self.assertEqual(f.read(), b"world\n")
277 self.assertEqual(f.read(), b"")
278
279 LARGE = 2**31
280
281 def large_file_ops(self, f):
282 assert f.readable()
283 assert f.writable()
284 self.assertEqual(f.seek(self.LARGE), self.LARGE)
285 self.assertEqual(f.tell(), self.LARGE)
286 self.assertEqual(f.write(b"xxx"), 3)
287 self.assertEqual(f.tell(), self.LARGE + 3)
288 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
289 self.assertEqual(f.truncate(), self.LARGE + 2)
290 self.assertEqual(f.tell(), self.LARGE + 2)
291 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
292 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000293 self.assertEqual(f.tell(), self.LARGE + 1)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000294 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
295 self.assertEqual(f.seek(-1, 2), self.LARGE)
296 self.assertEqual(f.read(2), b"x")
297
Antoine Pitrou19690592009-06-12 20:14:08 +0000298 def test_invalid_operations(self):
299 # Try writing on a file opened in read mode and vice-versa.
300 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000301 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000302 self.assertRaises(IOError, fp.read)
303 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000304 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000305 self.assertRaises(IOError, fp.write, b"blah")
306 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000307 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000308 self.assertRaises(IOError, fp.write, "blah")
309 self.assertRaises(IOError, fp.writelines, ["blah\n"])
310
Christian Heimes1a6387e2008-03-26 12:49:49 +0000311 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000312 with self.open(support.TESTFN, "wb", buffering=0) as f:
313 self.assertEqual(f.readable(), False)
314 self.assertEqual(f.writable(), True)
315 self.assertEqual(f.seekable(), True)
316 self.write_ops(f)
317 with self.open(support.TESTFN, "rb", buffering=0) as f:
318 self.assertEqual(f.readable(), True)
319 self.assertEqual(f.writable(), False)
320 self.assertEqual(f.seekable(), True)
321 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000322
323 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000324 with self.open(support.TESTFN, "wb") as f:
325 self.assertEqual(f.readable(), False)
326 self.assertEqual(f.writable(), True)
327 self.assertEqual(f.seekable(), True)
328 self.write_ops(f)
329 with self.open(support.TESTFN, "rb") as f:
330 self.assertEqual(f.readable(), True)
331 self.assertEqual(f.writable(), False)
332 self.assertEqual(f.seekable(), True)
333 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000334
335 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000336 with self.open(support.TESTFN, "wb") as f:
337 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
338 with self.open(support.TESTFN, "rb") as f:
339 self.assertEqual(f.readline(), b"abc\n")
340 self.assertEqual(f.readline(10), b"def\n")
341 self.assertEqual(f.readline(2), b"xy")
342 self.assertEqual(f.readline(4), b"zzy\n")
343 self.assertEqual(f.readline(), b"foo\x00bar\n")
344 self.assertEqual(f.readline(), b"another line")
345 self.assertRaises(TypeError, f.readline, 5.3)
346 with self.open(support.TESTFN, "r") as f:
347 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000348
349 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000350 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000351 self.write_ops(f)
352 data = f.getvalue()
353 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000354 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000355 self.read_ops(f, True)
356
357 def test_large_file_ops(self):
358 # On Windows and Mac OSX this test comsumes large resources; It takes
359 # a long time to build the >2GB file and takes >2GB of disk space
360 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000361 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
362 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000363 print("\nTesting large file ops skipped on %s." % sys.platform,
364 file=sys.stderr)
365 print("It requires %d bytes and a long time." % self.LARGE,
366 file=sys.stderr)
367 print("Use 'regrtest.py -u largefile test_io' to run it.",
368 file=sys.stderr)
369 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000370 with self.open(support.TESTFN, "w+b", 0) as f:
371 self.large_file_ops(f)
372 with self.open(support.TESTFN, "w+b") as f:
373 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000374
375 def test_with_open(self):
376 for bufsize in (0, 1, 100):
377 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000378 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000379 f.write(b"xxx")
380 self.assertEqual(f.closed, True)
381 f = None
382 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000383 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000384 1/0
385 except ZeroDivisionError:
386 self.assertEqual(f.closed, True)
387 else:
388 self.fail("1/0 didn't raise an exception")
389
Antoine Pitroue741cc62009-01-21 00:45:36 +0000390 # issue 5008
391 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000392 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000393 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000394 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000395 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000396 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000397 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000398 with self.open(support.TESTFN, "a") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000399 self.assert_(f.tell() > 0)
400
Christian Heimes1a6387e2008-03-26 12:49:49 +0000401 def test_destructor(self):
402 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000403 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000404 def __del__(self):
405 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000406 try:
407 f = super(MyFileIO, self).__del__
408 except AttributeError:
409 pass
410 else:
411 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000412 def close(self):
413 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000414 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000415 def flush(self):
416 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000417 super(MyFileIO, self).flush()
418 f = MyFileIO(support.TESTFN, "wb")
419 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000420 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000421 support.gc_collect()
422 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000423 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000424 self.assertEqual(f.read(), b"xxx")
425
426 def _check_base_destructor(self, base):
427 record = []
428 class MyIO(base):
429 def __init__(self):
430 # This exercises the availability of attributes on object
431 # destruction.
432 # (in the C version, close() is called by the tp_dealloc
433 # function, not by __del__)
434 self.on_del = 1
435 self.on_close = 2
436 self.on_flush = 3
437 def __del__(self):
438 record.append(self.on_del)
439 try:
440 f = super(MyIO, self).__del__
441 except AttributeError:
442 pass
443 else:
444 f()
445 def close(self):
446 record.append(self.on_close)
447 super(MyIO, self).close()
448 def flush(self):
449 record.append(self.on_flush)
450 super(MyIO, self).flush()
451 f = MyIO()
452 del f
453 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000454 self.assertEqual(record, [1, 2, 3])
455
Antoine Pitrou19690592009-06-12 20:14:08 +0000456 def test_IOBase_destructor(self):
457 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000458
Antoine Pitrou19690592009-06-12 20:14:08 +0000459 def test_RawIOBase_destructor(self):
460 self._check_base_destructor(self.RawIOBase)
461
462 def test_BufferedIOBase_destructor(self):
463 self._check_base_destructor(self.BufferedIOBase)
464
465 def test_TextIOBase_destructor(self):
466 self._check_base_destructor(self.TextIOBase)
467
468 def test_close_flushes(self):
469 with self.open(support.TESTFN, "wb") as f:
470 f.write(b"xxx")
471 with self.open(support.TESTFN, "rb") as f:
472 self.assertEqual(f.read(), b"xxx")
473
474 def test_array_writes(self):
475 a = array.array(b'i', range(10))
476 n = len(a.tostring())
477 with self.open(support.TESTFN, "wb", 0) as f:
478 self.assertEqual(f.write(a), n)
479 with self.open(support.TESTFN, "wb") as f:
480 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000481
482 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000483 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000484 closefd=False)
485
Antoine Pitrou19690592009-06-12 20:14:08 +0000486 def test_read_closed(self):
487 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000488 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000489 with self.open(support.TESTFN, "r") as f:
490 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000491 self.assertEqual(file.read(), "egg\n")
492 file.seek(0)
493 file.close()
494 self.assertRaises(ValueError, file.read)
495
496 def test_no_closefd_with_filename(self):
497 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000498 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000499
500 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000501 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000502 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000503 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000504 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000505 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000506 self.assertEqual(file.buffer.raw.closefd, False)
507
Antoine Pitrou19690592009-06-12 20:14:08 +0000508 def test_garbage_collection(self):
509 # FileIO objects are collected, and collecting them flushes
510 # all data to disk.
511 f = self.FileIO(support.TESTFN, "wb")
512 f.write(b"abcxxx")
513 f.f = f
514 wr = weakref.ref(f)
515 del f
516 support.gc_collect()
517 self.assert_(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000518 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000519 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000520
Antoine Pitrou19690592009-06-12 20:14:08 +0000521 def test_unbounded_file(self):
522 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
523 zero = "/dev/zero"
524 if not os.path.exists(zero):
525 self.skipTest("{0} does not exist".format(zero))
526 if sys.maxsize > 0x7FFFFFFF:
527 self.skipTest("test can only run in a 32-bit address space")
528 if support.real_max_memuse < support._2G:
529 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000530 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000532 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000533 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000534 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000535 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000536
Antoine Pitrou19690592009-06-12 20:14:08 +0000537class CIOTest(IOTest):
538 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000539
Antoine Pitrou19690592009-06-12 20:14:08 +0000540class PyIOTest(IOTest):
541 test_array_writes = unittest.skip(
542 "len(array.array) returns number of elements rather than bytelength"
543 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000544
545
Antoine Pitrou19690592009-06-12 20:14:08 +0000546class CommonBufferedTests:
547 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
548
549 def test_detach(self):
550 raw = self.MockRawIO()
551 buf = self.tp(raw)
552 self.assertIs(buf.detach(), raw)
553 self.assertRaises(ValueError, buf.detach)
554
555 def test_fileno(self):
556 rawio = self.MockRawIO()
557 bufio = self.tp(rawio)
558
559 self.assertEquals(42, bufio.fileno())
560
561 def test_no_fileno(self):
562 # XXX will we always have fileno() function? If so, kill
563 # this test. Else, write it.
564 pass
565
566 def test_invalid_args(self):
567 rawio = self.MockRawIO()
568 bufio = self.tp(rawio)
569 # Invalid whence
570 self.assertRaises(ValueError, bufio.seek, 0, -1)
571 self.assertRaises(ValueError, bufio.seek, 0, 3)
572
573 def test_override_destructor(self):
574 tp = self.tp
575 record = []
576 class MyBufferedIO(tp):
577 def __del__(self):
578 record.append(1)
579 try:
580 f = super(MyBufferedIO, self).__del__
581 except AttributeError:
582 pass
583 else:
584 f()
585 def close(self):
586 record.append(2)
587 super(MyBufferedIO, self).close()
588 def flush(self):
589 record.append(3)
590 super(MyBufferedIO, self).flush()
591 rawio = self.MockRawIO()
592 bufio = MyBufferedIO(rawio)
593 writable = bufio.writable()
594 del bufio
595 support.gc_collect()
596 if writable:
597 self.assertEqual(record, [1, 2, 3])
598 else:
599 self.assertEqual(record, [1, 2])
600
601 def test_context_manager(self):
602 # Test usability as a context manager
603 rawio = self.MockRawIO()
604 bufio = self.tp(rawio)
605 def _with():
606 with bufio:
607 pass
608 _with()
609 # bufio should now be closed, and using it a second time should raise
610 # a ValueError.
611 self.assertRaises(ValueError, _with)
612
613 def test_error_through_destructor(self):
614 # Test that the exception state is not modified by a destructor,
615 # even if close() fails.
616 rawio = self.CloseFailureIO()
617 def f():
618 self.tp(rawio).xyzzy
619 with support.captured_output("stderr") as s:
620 self.assertRaises(AttributeError, f)
621 s = s.getvalue().strip()
622 if s:
623 # The destructor *may* have printed an unraisable error, check it
624 self.assertEqual(len(s.splitlines()), 1)
625 self.assert_(s.startswith("Exception IOError: "), s)
626 self.assert_(s.endswith(" ignored"), s)
627
628 def test_repr(self):
629 raw = self.MockRawIO()
630 b = self.tp(raw)
631 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
632 self.assertEqual(repr(b), "<%s>" % clsname)
633 raw.name = "dummy"
634 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
635 raw.name = b"dummy"
636 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000637
638
Antoine Pitrou19690592009-06-12 20:14:08 +0000639class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
640 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000641
Antoine Pitrou19690592009-06-12 20:14:08 +0000642 def test_constructor(self):
643 rawio = self.MockRawIO([b"abc"])
644 bufio = self.tp(rawio)
645 bufio.__init__(rawio)
646 bufio.__init__(rawio, buffer_size=1024)
647 bufio.__init__(rawio, buffer_size=16)
648 self.assertEquals(b"abc", bufio.read())
649 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
650 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
651 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
652 rawio = self.MockRawIO([b"abc"])
653 bufio.__init__(rawio)
654 self.assertEquals(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000655
Antoine Pitrou19690592009-06-12 20:14:08 +0000656 def test_read(self):
657 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
658 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000659 self.assertEquals(b"abcdef", bufio.read(6))
Antoine Pitrou19690592009-06-12 20:14:08 +0000660 # Invalid args
661 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000662
Antoine Pitrou19690592009-06-12 20:14:08 +0000663 def test_read1(self):
664 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
665 bufio = self.tp(rawio)
666 self.assertEquals(b"a", bufio.read(1))
667 self.assertEquals(b"b", bufio.read1(1))
668 self.assertEquals(rawio._reads, 1)
669 self.assertEquals(b"c", bufio.read1(100))
670 self.assertEquals(rawio._reads, 1)
671 self.assertEquals(b"d", bufio.read1(100))
672 self.assertEquals(rawio._reads, 2)
673 self.assertEquals(b"efg", bufio.read1(100))
674 self.assertEquals(rawio._reads, 3)
675 self.assertEquals(b"", bufio.read1(100))
676 # Invalid args
677 self.assertRaises(ValueError, bufio.read1, -1)
678
679 def test_readinto(self):
680 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
681 bufio = self.tp(rawio)
682 b = bytearray(2)
683 self.assertEquals(bufio.readinto(b), 2)
684 self.assertEquals(b, b"ab")
685 self.assertEquals(bufio.readinto(b), 2)
686 self.assertEquals(b, b"cd")
687 self.assertEquals(bufio.readinto(b), 2)
688 self.assertEquals(b, b"ef")
689 self.assertEquals(bufio.readinto(b), 1)
690 self.assertEquals(b, b"gf")
691 self.assertEquals(bufio.readinto(b), 0)
692 self.assertEquals(b, b"gf")
693
694 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000695 data = b"abcdefghi"
696 dlen = len(data)
697
698 tests = [
699 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
700 [ 100, [ 3, 3, 3], [ dlen ] ],
701 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
702 ]
703
704 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000705 rawio = self.MockFileIO(data)
706 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000707 pos = 0
708 for nbytes in buf_read_sizes:
709 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
710 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000711 # this is mildly implementation-dependent
Christian Heimes1a6387e2008-03-26 12:49:49 +0000712 self.assertEquals(rawio.read_history, raw_read_sizes)
713
Antoine Pitrou19690592009-06-12 20:14:08 +0000714 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000715 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000716 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
717 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000718
719 self.assertEquals(b"abcd", bufio.read(6))
720 self.assertEquals(b"e", bufio.read(1))
721 self.assertEquals(b"fg", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000722 self.assertEquals(b"", bufio.peek(1))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000723 self.assert_(None is bufio.read())
724 self.assertEquals(b"", bufio.read())
725
Antoine Pitrou19690592009-06-12 20:14:08 +0000726 def test_read_past_eof(self):
727 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
728 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000729
730 self.assertEquals(b"abcdefg", bufio.read(9000))
731
Antoine Pitrou19690592009-06-12 20:14:08 +0000732 def test_read_all(self):
733 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
734 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000735
736 self.assertEquals(b"abcdefg", bufio.read())
737
Antoine Pitrou19690592009-06-12 20:14:08 +0000738 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000739 try:
740 # Write out many bytes with exactly the same number of 0's,
741 # 1's... 255's. This will help us check that concurrent reading
742 # doesn't duplicate or forget contents.
743 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000744 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000745 random.shuffle(l)
746 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000747 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000748 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000749 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000750 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000751 errors = []
752 results = []
753 def f():
754 try:
755 # Intra-buffer read then buffer-flushing read
756 for n in cycle([1, 19]):
757 s = bufio.read(n)
758 if not s:
759 break
760 # list.append() is atomic
761 results.append(s)
762 except Exception as e:
763 errors.append(e)
764 raise
765 threads = [threading.Thread(target=f) for x in range(20)]
766 for t in threads:
767 t.start()
768 time.sleep(0.02) # yield
769 for t in threads:
770 t.join()
771 self.assertFalse(errors,
772 "the following exceptions were caught: %r" % errors)
773 s = b''.join(results)
774 for i in range(256):
775 c = bytes(bytearray([i]))
776 self.assertEqual(s.count(c), N)
777 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000778 support.unlink(support.TESTFN)
779
780 def test_misbehaved_io(self):
781 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
782 bufio = self.tp(rawio)
783 self.assertRaises(IOError, bufio.seek, 0)
784 self.assertRaises(IOError, bufio.tell)
785
786class CBufferedReaderTest(BufferedReaderTest):
787 tp = io.BufferedReader
788
789 def test_constructor(self):
790 BufferedReaderTest.test_constructor(self)
791 # The allocation can succeed on 32-bit builds, e.g. with more
792 # than 2GB RAM and a 64-bit kernel.
793 if sys.maxsize > 0x7FFFFFFF:
794 rawio = self.MockRawIO()
795 bufio = self.tp(rawio)
796 self.assertRaises((OverflowError, MemoryError, ValueError),
797 bufio.__init__, rawio, sys.maxsize)
798
799 def test_initialization(self):
800 rawio = self.MockRawIO([b"abc"])
801 bufio = self.tp(rawio)
802 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
803 self.assertRaises(ValueError, bufio.read)
804 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
805 self.assertRaises(ValueError, bufio.read)
806 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
807 self.assertRaises(ValueError, bufio.read)
808
809 def test_misbehaved_io_read(self):
810 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
811 bufio = self.tp(rawio)
812 # _pyio.BufferedReader seems to implement reading different, so that
813 # checking this is not so easy.
814 self.assertRaises(IOError, bufio.read, 10)
815
816 def test_garbage_collection(self):
817 # C BufferedReader objects are collected.
818 # The Python version has __del__, so it ends into gc.garbage instead
819 rawio = self.FileIO(support.TESTFN, "w+b")
820 f = self.tp(rawio)
821 f.f = f
822 wr = weakref.ref(f)
823 del f
824 support.gc_collect()
825 self.assert_(wr() is None, wr)
826
827class PyBufferedReaderTest(BufferedReaderTest):
828 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000829
830
Antoine Pitrou19690592009-06-12 20:14:08 +0000831class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
832 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000833
Antoine Pitrou19690592009-06-12 20:14:08 +0000834 def test_constructor(self):
835 rawio = self.MockRawIO()
836 bufio = self.tp(rawio)
837 bufio.__init__(rawio)
838 bufio.__init__(rawio, buffer_size=1024)
839 bufio.__init__(rawio, buffer_size=16)
840 self.assertEquals(3, bufio.write(b"abc"))
841 bufio.flush()
842 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
843 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
844 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
845 bufio.__init__(rawio)
846 self.assertEquals(3, bufio.write(b"ghi"))
847 bufio.flush()
848 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000849
Antoine Pitrou19690592009-06-12 20:14:08 +0000850 def test_detach_flush(self):
851 raw = self.MockRawIO()
852 buf = self.tp(raw)
853 buf.write(b"howdy!")
854 self.assertFalse(raw._write_stack)
855 buf.detach()
856 self.assertEqual(raw._write_stack, [b"howdy!"])
857
858 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000859 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +0000860 writer = self.MockRawIO()
861 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000862 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000863 self.assertFalse(writer._write_stack)
864
Antoine Pitrou19690592009-06-12 20:14:08 +0000865 def test_write_overflow(self):
866 writer = self.MockRawIO()
867 bufio = self.tp(writer, 8)
868 contents = b"abcdefghijklmnop"
869 for n in range(0, len(contents), 3):
870 bufio.write(contents[n:n+3])
871 flushed = b"".join(writer._write_stack)
872 # At least (total - 8) bytes were implicitly flushed, perhaps more
873 # depending on the implementation.
874 self.assert_(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000875
Antoine Pitrou19690592009-06-12 20:14:08 +0000876 def check_writes(self, intermediate_func):
877 # Lots of writes, test the flushed output is as expected.
878 contents = bytes(range(256)) * 1000
879 n = 0
880 writer = self.MockRawIO()
881 bufio = self.tp(writer, 13)
882 # Generator of write sizes: repeat each N 15 times then proceed to N+1
883 def gen_sizes():
884 for size in count(1):
885 for i in range(15):
886 yield size
887 sizes = gen_sizes()
888 while n < len(contents):
889 size = min(next(sizes), len(contents) - n)
890 self.assertEquals(bufio.write(contents[n:n+size]), size)
891 intermediate_func(bufio)
892 n += size
893 bufio.flush()
894 self.assertEquals(contents,
895 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000896
Antoine Pitrou19690592009-06-12 20:14:08 +0000897 def test_writes(self):
898 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000899
Antoine Pitrou19690592009-06-12 20:14:08 +0000900 def test_writes_and_flushes(self):
901 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000902
Antoine Pitrou19690592009-06-12 20:14:08 +0000903 def test_writes_and_seeks(self):
904 def _seekabs(bufio):
905 pos = bufio.tell()
906 bufio.seek(pos + 1, 0)
907 bufio.seek(pos - 1, 0)
908 bufio.seek(pos, 0)
909 self.check_writes(_seekabs)
910 def _seekrel(bufio):
911 pos = bufio.seek(0, 1)
912 bufio.seek(+1, 1)
913 bufio.seek(-1, 1)
914 bufio.seek(pos, 0)
915 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000916
Antoine Pitrou19690592009-06-12 20:14:08 +0000917 def test_writes_and_truncates(self):
918 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000919
Antoine Pitrou19690592009-06-12 20:14:08 +0000920 def test_write_non_blocking(self):
921 raw = self.MockNonBlockWriterIO()
922 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000923
Antoine Pitrou19690592009-06-12 20:14:08 +0000924 self.assertEquals(bufio.write(b"abcd"), 4)
925 self.assertEquals(bufio.write(b"efghi"), 5)
926 # 1 byte will be written, the rest will be buffered
927 raw.block_on(b"k")
928 self.assertEquals(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000929
Antoine Pitrou19690592009-06-12 20:14:08 +0000930 # 8 bytes will be written, 8 will be buffered and the rest will be lost
931 raw.block_on(b"0")
932 try:
933 bufio.write(b"opqrwxyz0123456789")
934 except self.BlockingIOError as e:
935 written = e.characters_written
936 else:
937 self.fail("BlockingIOError should have been raised")
938 self.assertEquals(written, 16)
939 self.assertEquals(raw.pop_written(),
940 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000941
Antoine Pitrou19690592009-06-12 20:14:08 +0000942 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
943 s = raw.pop_written()
944 # Previously buffered bytes were flushed
945 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000946
Antoine Pitrou19690592009-06-12 20:14:08 +0000947 def test_write_and_rewind(self):
948 raw = io.BytesIO()
949 bufio = self.tp(raw, 4)
950 self.assertEqual(bufio.write(b"abcdef"), 6)
951 self.assertEqual(bufio.tell(), 6)
952 bufio.seek(0, 0)
953 self.assertEqual(bufio.write(b"XY"), 2)
954 bufio.seek(6, 0)
955 self.assertEqual(raw.getvalue(), b"XYcdef")
956 self.assertEqual(bufio.write(b"123456"), 6)
957 bufio.flush()
958 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000959
Antoine Pitrou19690592009-06-12 20:14:08 +0000960 def test_flush(self):
961 writer = self.MockRawIO()
962 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000963 bufio.write(b"abc")
964 bufio.flush()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000965 self.assertEquals(b"abc", writer._write_stack[0])
966
Antoine Pitrou19690592009-06-12 20:14:08 +0000967 def test_destructor(self):
968 writer = self.MockRawIO()
969 bufio = self.tp(writer, 8)
970 bufio.write(b"abc")
971 del bufio
972 support.gc_collect()
973 self.assertEquals(b"abc", writer._write_stack[0])
974
975 def test_truncate(self):
976 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000977 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000978 bufio = self.tp(raw, 8)
979 bufio.write(b"abcdef")
980 self.assertEqual(bufio.truncate(3), 3)
981 self.assertEqual(bufio.tell(), 3)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000982 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000983 self.assertEqual(f.read(), b"abc")
984
985 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000986 try:
Antoine Pitrou19690592009-06-12 20:14:08 +0000987 # Write out many bytes from many threads and test they were
988 # all flushed.
989 N = 1000
990 contents = bytes(range(256)) * N
991 sizes = cycle([1, 19])
992 n = 0
993 queue = deque()
994 while n < len(contents):
995 size = next(sizes)
996 queue.append(contents[n:n+size])
997 n += size
998 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000999 # We use a real file object because it allows us to
1000 # exercise situations where the GIL is released before
1001 # writing the buffer to the raw streams. This is in addition
1002 # to concurrency issues due to switching threads in the middle
1003 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001004 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001005 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001006 errors = []
1007 def f():
1008 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001009 while True:
1010 try:
1011 s = queue.popleft()
1012 except IndexError:
1013 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001014 bufio.write(s)
1015 except Exception as e:
1016 errors.append(e)
1017 raise
1018 threads = [threading.Thread(target=f) for x in range(20)]
1019 for t in threads:
1020 t.start()
1021 time.sleep(0.02) # yield
1022 for t in threads:
1023 t.join()
1024 self.assertFalse(errors,
1025 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001026 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001027 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001028 s = f.read()
1029 for i in range(256):
1030 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001031 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001032 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001033
Antoine Pitrou19690592009-06-12 20:14:08 +00001034 def test_misbehaved_io(self):
1035 rawio = self.MisbehavedRawIO()
1036 bufio = self.tp(rawio, 5)
1037 self.assertRaises(IOError, bufio.seek, 0)
1038 self.assertRaises(IOError, bufio.tell)
1039 self.assertRaises(IOError, bufio.write, b"abcdef")
1040
1041 def test_max_buffer_size_deprecation(self):
1042 with support.check_warnings() as w:
1043 warnings.simplefilter("always", DeprecationWarning)
1044 self.tp(self.MockRawIO(), 8, 12)
1045 self.assertEqual(len(w.warnings), 1)
1046 warning = w.warnings[0]
1047 self.assertTrue(warning.category is DeprecationWarning)
1048 self.assertEqual(str(warning.message),
1049 "max_buffer_size is deprecated")
1050
1051
1052class CBufferedWriterTest(BufferedWriterTest):
1053 tp = io.BufferedWriter
1054
1055 def test_constructor(self):
1056 BufferedWriterTest.test_constructor(self)
1057 # The allocation can succeed on 32-bit builds, e.g. with more
1058 # than 2GB RAM and a 64-bit kernel.
1059 if sys.maxsize > 0x7FFFFFFF:
1060 rawio = self.MockRawIO()
1061 bufio = self.tp(rawio)
1062 self.assertRaises((OverflowError, MemoryError, ValueError),
1063 bufio.__init__, rawio, sys.maxsize)
1064
1065 def test_initialization(self):
1066 rawio = self.MockRawIO()
1067 bufio = self.tp(rawio)
1068 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1069 self.assertRaises(ValueError, bufio.write, b"def")
1070 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1071 self.assertRaises(ValueError, bufio.write, b"def")
1072 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1073 self.assertRaises(ValueError, bufio.write, b"def")
1074
1075 def test_garbage_collection(self):
1076 # C BufferedWriter objects are collected, and collecting them flushes
1077 # all data to disk.
1078 # The Python version has __del__, so it ends into gc.garbage instead
1079 rawio = self.FileIO(support.TESTFN, "w+b")
1080 f = self.tp(rawio)
1081 f.write(b"123xxx")
1082 f.x = f
1083 wr = weakref.ref(f)
1084 del f
1085 support.gc_collect()
1086 self.assert_(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001087 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001088 self.assertEqual(f.read(), b"123xxx")
1089
1090
1091class PyBufferedWriterTest(BufferedWriterTest):
1092 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001093
1094class BufferedRWPairTest(unittest.TestCase):
1095
Antoine Pitrou19690592009-06-12 20:14:08 +00001096 def test_constructor(self):
1097 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001098 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001099
Antoine Pitrou19690592009-06-12 20:14:08 +00001100 def test_detach(self):
1101 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1102 self.assertRaises(self.UnsupportedOperation, pair.detach)
1103
1104 def test_constructor_max_buffer_size_deprecation(self):
1105 with support.check_warnings() as w:
1106 warnings.simplefilter("always", DeprecationWarning)
1107 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1108 self.assertEqual(len(w.warnings), 1)
1109 warning = w.warnings[0]
1110 self.assertTrue(warning.category is DeprecationWarning)
1111 self.assertEqual(str(warning.message),
1112 "max_buffer_size is deprecated")
1113
1114 def test_constructor_with_not_readable(self):
1115 class NotReadable(MockRawIO):
1116 def readable(self):
1117 return False
1118
1119 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1120
1121 def test_constructor_with_not_writeable(self):
1122 class NotWriteable(MockRawIO):
1123 def writable(self):
1124 return False
1125
1126 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1127
1128 def test_read(self):
1129 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1130
1131 self.assertEqual(pair.read(3), b"abc")
1132 self.assertEqual(pair.read(1), b"d")
1133 self.assertEqual(pair.read(), b"ef")
1134
1135 def test_read1(self):
1136 # .read1() is delegated to the underlying reader object, so this test
1137 # can be shallow.
1138 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1139
1140 self.assertEqual(pair.read1(3), b"abc")
1141
1142 def test_readinto(self):
1143 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1144
1145 data = bytearray(5)
1146 self.assertEqual(pair.readinto(data), 5)
1147 self.assertEqual(data, b"abcde")
1148
1149 def test_write(self):
1150 w = self.MockRawIO()
1151 pair = self.tp(self.MockRawIO(), w)
1152
1153 pair.write(b"abc")
1154 pair.flush()
1155 pair.write(b"def")
1156 pair.flush()
1157 self.assertEqual(w._write_stack, [b"abc", b"def"])
1158
1159 def test_peek(self):
1160 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1161
1162 self.assertTrue(pair.peek(3).startswith(b"abc"))
1163 self.assertEqual(pair.read(3), b"abc")
1164
1165 def test_readable(self):
1166 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1167 self.assertTrue(pair.readable())
1168
1169 def test_writeable(self):
1170 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1171 self.assertTrue(pair.writable())
1172
1173 def test_seekable(self):
1174 # BufferedRWPairs are never seekable, even if their readers and writers
1175 # are.
1176 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1177 self.assertFalse(pair.seekable())
1178
1179 # .flush() is delegated to the underlying writer object and has been
1180 # tested in the test_write method.
1181
1182 def test_close_and_closed(self):
1183 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1184 self.assertFalse(pair.closed)
1185 pair.close()
1186 self.assertTrue(pair.closed)
1187
1188 def test_isatty(self):
1189 class SelectableIsAtty(MockRawIO):
1190 def __init__(self, isatty):
1191 MockRawIO.__init__(self)
1192 self._isatty = isatty
1193
1194 def isatty(self):
1195 return self._isatty
1196
1197 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1198 self.assertFalse(pair.isatty())
1199
1200 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1201 self.assertTrue(pair.isatty())
1202
1203 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1204 self.assertTrue(pair.isatty())
1205
1206 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1207 self.assertTrue(pair.isatty())
1208
1209class CBufferedRWPairTest(BufferedRWPairTest):
1210 tp = io.BufferedRWPair
1211
1212class PyBufferedRWPairTest(BufferedRWPairTest):
1213 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001214
1215
Antoine Pitrou19690592009-06-12 20:14:08 +00001216class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1217 read_mode = "rb+"
1218 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001219
Antoine Pitrou19690592009-06-12 20:14:08 +00001220 def test_constructor(self):
1221 BufferedReaderTest.test_constructor(self)
1222 BufferedWriterTest.test_constructor(self)
1223
1224 def test_read_and_write(self):
1225 raw = self.MockRawIO((b"asdf", b"ghjk"))
1226 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001227
1228 self.assertEqual(b"as", rw.read(2))
1229 rw.write(b"ddd")
1230 rw.write(b"eee")
1231 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001232 self.assertEqual(b"ghjk", rw.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001233 self.assertEquals(b"dddeee", raw._write_stack[0])
1234
Antoine Pitrou19690592009-06-12 20:14:08 +00001235 def test_seek_and_tell(self):
1236 raw = self.BytesIO(b"asdfghjkl")
1237 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001238
1239 self.assertEquals(b"as", rw.read(2))
1240 self.assertEquals(2, rw.tell())
1241 rw.seek(0, 0)
1242 self.assertEquals(b"asdf", rw.read(4))
1243
1244 rw.write(b"asdf")
1245 rw.seek(0, 0)
1246 self.assertEquals(b"asdfasdfl", rw.read())
1247 self.assertEquals(9, rw.tell())
1248 rw.seek(-4, 2)
1249 self.assertEquals(5, rw.tell())
1250 rw.seek(2, 1)
1251 self.assertEquals(7, rw.tell())
1252 self.assertEquals(b"fl", rw.read(11))
1253 self.assertRaises(TypeError, rw.seek, 0.0)
1254
Antoine Pitrou19690592009-06-12 20:14:08 +00001255 def check_flush_and_read(self, read_func):
1256 raw = self.BytesIO(b"abcdefghi")
1257 bufio = self.tp(raw)
1258
1259 self.assertEquals(b"ab", read_func(bufio, 2))
1260 bufio.write(b"12")
1261 self.assertEquals(b"ef", read_func(bufio, 2))
1262 self.assertEquals(6, bufio.tell())
1263 bufio.flush()
1264 self.assertEquals(6, bufio.tell())
1265 self.assertEquals(b"ghi", read_func(bufio))
1266 raw.seek(0, 0)
1267 raw.write(b"XYZ")
1268 # flush() resets the read buffer
1269 bufio.flush()
1270 bufio.seek(0, 0)
1271 self.assertEquals(b"XYZ", read_func(bufio, 3))
1272
1273 def test_flush_and_read(self):
1274 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1275
1276 def test_flush_and_readinto(self):
1277 def _readinto(bufio, n=-1):
1278 b = bytearray(n if n >= 0 else 9999)
1279 n = bufio.readinto(b)
1280 return bytes(b[:n])
1281 self.check_flush_and_read(_readinto)
1282
1283 def test_flush_and_peek(self):
1284 def _peek(bufio, n=-1):
1285 # This relies on the fact that the buffer can contain the whole
1286 # raw stream, otherwise peek() can return less.
1287 b = bufio.peek(n)
1288 if n != -1:
1289 b = b[:n]
1290 bufio.seek(len(b), 1)
1291 return b
1292 self.check_flush_and_read(_peek)
1293
1294 def test_flush_and_write(self):
1295 raw = self.BytesIO(b"abcdefghi")
1296 bufio = self.tp(raw)
1297
1298 bufio.write(b"123")
1299 bufio.flush()
1300 bufio.write(b"45")
1301 bufio.flush()
1302 bufio.seek(0, 0)
1303 self.assertEquals(b"12345fghi", raw.getvalue())
1304 self.assertEquals(b"12345fghi", bufio.read())
1305
1306 def test_threads(self):
1307 BufferedReaderTest.test_threads(self)
1308 BufferedWriterTest.test_threads(self)
1309
1310 def test_writes_and_peek(self):
1311 def _peek(bufio):
1312 bufio.peek(1)
1313 self.check_writes(_peek)
1314 def _peek(bufio):
1315 pos = bufio.tell()
1316 bufio.seek(-1, 1)
1317 bufio.peek(1)
1318 bufio.seek(pos, 0)
1319 self.check_writes(_peek)
1320
1321 def test_writes_and_reads(self):
1322 def _read(bufio):
1323 bufio.seek(-1, 1)
1324 bufio.read(1)
1325 self.check_writes(_read)
1326
1327 def test_writes_and_read1s(self):
1328 def _read1(bufio):
1329 bufio.seek(-1, 1)
1330 bufio.read1(1)
1331 self.check_writes(_read1)
1332
1333 def test_writes_and_readintos(self):
1334 def _read(bufio):
1335 bufio.seek(-1, 1)
1336 bufio.readinto(bytearray(1))
1337 self.check_writes(_read)
1338
1339 def test_misbehaved_io(self):
1340 BufferedReaderTest.test_misbehaved_io(self)
1341 BufferedWriterTest.test_misbehaved_io(self)
1342
1343class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1344 tp = io.BufferedRandom
1345
1346 def test_constructor(self):
1347 BufferedRandomTest.test_constructor(self)
1348 # The allocation can succeed on 32-bit builds, e.g. with more
1349 # than 2GB RAM and a 64-bit kernel.
1350 if sys.maxsize > 0x7FFFFFFF:
1351 rawio = self.MockRawIO()
1352 bufio = self.tp(rawio)
1353 self.assertRaises((OverflowError, MemoryError, ValueError),
1354 bufio.__init__, rawio, sys.maxsize)
1355
1356 def test_garbage_collection(self):
1357 CBufferedReaderTest.test_garbage_collection(self)
1358 CBufferedWriterTest.test_garbage_collection(self)
1359
1360class PyBufferedRandomTest(BufferedRandomTest):
1361 tp = pyio.BufferedRandom
1362
1363
Christian Heimes1a6387e2008-03-26 12:49:49 +00001364# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1365# properties:
1366# - A single output character can correspond to many bytes of input.
1367# - The number of input bytes to complete the character can be
1368# undetermined until the last input byte is received.
1369# - The number of input bytes can vary depending on previous input.
1370# - A single input byte can correspond to many characters of output.
1371# - The number of output characters can be undetermined until the
1372# last input byte is received.
1373# - The number of output characters can vary depending on previous input.
1374
1375class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1376 """
1377 For testing seek/tell behavior with a stateful, buffering decoder.
1378
1379 Input is a sequence of words. Words may be fixed-length (length set
1380 by input) or variable-length (period-terminated). In variable-length
1381 mode, extra periods are ignored. Possible words are:
1382 - 'i' followed by a number sets the input length, I (maximum 99).
1383 When I is set to 0, words are space-terminated.
1384 - 'o' followed by a number sets the output length, O (maximum 99).
1385 - Any other word is converted into a word followed by a period on
1386 the output. The output word consists of the input word truncated
1387 or padded out with hyphens to make its length equal to O. If O
1388 is 0, the word is output verbatim without truncating or padding.
1389 I and O are initially set to 1. When I changes, any buffered input is
1390 re-scanned according to the new I. EOF also terminates the last word.
1391 """
1392
1393 def __init__(self, errors='strict'):
1394 codecs.IncrementalDecoder.__init__(self, errors)
1395 self.reset()
1396
1397 def __repr__(self):
1398 return '<SID %x>' % id(self)
1399
1400 def reset(self):
1401 self.i = 1
1402 self.o = 1
1403 self.buffer = bytearray()
1404
1405 def getstate(self):
1406 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1407 return bytes(self.buffer), i*100 + o
1408
1409 def setstate(self, state):
1410 buffer, io = state
1411 self.buffer = bytearray(buffer)
1412 i, o = divmod(io, 100)
1413 self.i, self.o = i ^ 1, o ^ 1
1414
1415 def decode(self, input, final=False):
1416 output = ''
1417 for b in input:
1418 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001419 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001420 if self.buffer:
1421 output += self.process_word()
1422 else:
1423 self.buffer.append(b)
1424 else: # fixed-length, terminate after self.i bytes
1425 self.buffer.append(b)
1426 if len(self.buffer) == self.i:
1427 output += self.process_word()
1428 if final and self.buffer: # EOF terminates the last word
1429 output += self.process_word()
1430 return output
1431
1432 def process_word(self):
1433 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001434 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001435 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001436 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001437 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1438 else:
1439 output = self.buffer.decode('ascii')
1440 if len(output) < self.o:
1441 output += '-'*self.o # pad out with hyphens
1442 if self.o:
1443 output = output[:self.o] # truncate to output length
1444 output += '.'
1445 self.buffer = bytearray()
1446 return output
1447
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001448 codecEnabled = False
1449
1450 @classmethod
1451 def lookupTestDecoder(cls, name):
1452 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001453 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001454 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001455 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001456 incrementalencoder=None,
1457 streamreader=None, streamwriter=None,
1458 incrementaldecoder=cls)
1459
1460# Register the previous decoder for testing.
1461# Disabled by default, tests will enable it.
1462codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1463
1464
Christian Heimes1a6387e2008-03-26 12:49:49 +00001465class StatefulIncrementalDecoderTest(unittest.TestCase):
1466 """
1467 Make sure the StatefulIncrementalDecoder actually works.
1468 """
1469
1470 test_cases = [
1471 # I=1, O=1 (fixed-length input == fixed-length output)
1472 (b'abcd', False, 'a.b.c.d.'),
1473 # I=0, O=0 (variable-length input, variable-length output)
1474 (b'oiabcd', True, 'abcd.'),
1475 # I=0, O=0 (should ignore extra periods)
1476 (b'oi...abcd...', True, 'abcd.'),
1477 # I=0, O=6 (variable-length input, fixed-length output)
1478 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1479 # I=2, O=6 (fixed-length input < fixed-length output)
1480 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1481 # I=6, O=3 (fixed-length input > fixed-length output)
1482 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1483 # I=0, then 3; O=29, then 15 (with longer output)
1484 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1485 'a----------------------------.' +
1486 'b----------------------------.' +
1487 'cde--------------------------.' +
1488 'abcdefghijabcde.' +
1489 'a.b------------.' +
1490 '.c.------------.' +
1491 'd.e------------.' +
1492 'k--------------.' +
1493 'l--------------.' +
1494 'm--------------.')
1495 ]
1496
Antoine Pitrou19690592009-06-12 20:14:08 +00001497 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001498 # Try a few one-shot test cases.
1499 for input, eof, output in self.test_cases:
1500 d = StatefulIncrementalDecoder()
1501 self.assertEquals(d.decode(input, eof), output)
1502
1503 # Also test an unfinished decode, followed by forcing EOF.
1504 d = StatefulIncrementalDecoder()
1505 self.assertEquals(d.decode(b'oiabcd'), '')
1506 self.assertEquals(d.decode(b'', 1), 'abcd.')
1507
1508class TextIOWrapperTest(unittest.TestCase):
1509
1510 def setUp(self):
1511 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1512 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001513 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001514
1515 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001516 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001517
Antoine Pitrou19690592009-06-12 20:14:08 +00001518 def test_constructor(self):
1519 r = self.BytesIO(b"\xc3\xa9\n\n")
1520 b = self.BufferedReader(r, 1000)
1521 t = self.TextIOWrapper(b)
1522 t.__init__(b, encoding="latin1", newline="\r\n")
1523 self.assertEquals(t.encoding, "latin1")
1524 self.assertEquals(t.line_buffering, False)
1525 t.__init__(b, encoding="utf8", line_buffering=True)
1526 self.assertEquals(t.encoding, "utf8")
1527 self.assertEquals(t.line_buffering, True)
1528 self.assertEquals("\xe9\n", t.readline())
1529 self.assertRaises(TypeError, t.__init__, b, newline=42)
1530 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1531
1532 def test_detach(self):
1533 r = self.BytesIO()
1534 b = self.BufferedWriter(r)
1535 t = self.TextIOWrapper(b)
1536 self.assertIs(t.detach(), b)
1537
1538 t = self.TextIOWrapper(b, encoding="ascii")
1539 t.write("howdy")
1540 self.assertFalse(r.getvalue())
1541 t.detach()
1542 self.assertEqual(r.getvalue(), b"howdy")
1543 self.assertRaises(ValueError, t.detach)
1544
1545 def test_repr(self):
1546 raw = self.BytesIO("hello".encode("utf-8"))
1547 b = self.BufferedReader(raw)
1548 t = self.TextIOWrapper(b, encoding="utf-8")
1549 modname = self.TextIOWrapper.__module__
1550 self.assertEqual(repr(t),
1551 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1552 raw.name = "dummy"
1553 self.assertEqual(repr(t),
1554 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1555 raw.name = b"dummy"
1556 self.assertEqual(repr(t),
1557 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1558
1559 def test_line_buffering(self):
1560 r = self.BytesIO()
1561 b = self.BufferedWriter(r, 1000)
1562 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1563 t.write("X")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001564 self.assertEquals(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001565 t.write("Y\nZ")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001566 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001567 t.write("A\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001568 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1569
Antoine Pitrou19690592009-06-12 20:14:08 +00001570 def test_encoding(self):
1571 # Check the encoding attribute is always set, and valid
1572 b = self.BytesIO()
1573 t = self.TextIOWrapper(b, encoding="utf8")
1574 self.assertEqual(t.encoding, "utf8")
1575 t = self.TextIOWrapper(b)
1576 self.assert_(t.encoding is not None)
1577 codecs.lookup(t.encoding)
1578
1579 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001580 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001581 b = self.BytesIO(b"abc\n\xff\n")
1582 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001583 self.assertRaises(UnicodeError, t.read)
1584 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001585 b = self.BytesIO(b"abc\n\xff\n")
1586 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001587 self.assertRaises(UnicodeError, t.read)
1588 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001589 b = self.BytesIO(b"abc\n\xff\n")
1590 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001591 self.assertEquals(t.read(), "abc\n\n")
1592 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001593 b = self.BytesIO(b"abc\n\xff\n")
1594 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
1595 self.assertEquals(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001596
Antoine Pitrou19690592009-06-12 20:14:08 +00001597 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001598 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001599 b = self.BytesIO()
1600 t = self.TextIOWrapper(b, encoding="ascii")
1601 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001602 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001603 b = self.BytesIO()
1604 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1605 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001606 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001607 b = self.BytesIO()
1608 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001609 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001610 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001611 t.flush()
1612 self.assertEquals(b.getvalue(), b"abcdef\n")
1613 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001614 b = self.BytesIO()
1615 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001616 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001617 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001618 t.flush()
1619 self.assertEquals(b.getvalue(), b"abc?def\n")
1620
Antoine Pitrou19690592009-06-12 20:14:08 +00001621 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001622 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1623
1624 tests = [
1625 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1626 [ '', input_lines ],
1627 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1628 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1629 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1630 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001631 encodings = (
1632 'utf-8', 'latin-1',
1633 'utf-16', 'utf-16-le', 'utf-16-be',
1634 'utf-32', 'utf-32-le', 'utf-32-be',
1635 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001636
1637 # Try a range of buffer sizes to test the case where \r is the last
1638 # character in TextIOWrapper._pending_line.
1639 for encoding in encodings:
1640 # XXX: str.encode() should return bytes
1641 data = bytes(''.join(input_lines).encode(encoding))
1642 for do_reads in (False, True):
1643 for bufsize in range(1, 10):
1644 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001645 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1646 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001647 encoding=encoding)
1648 if do_reads:
1649 got_lines = []
1650 while True:
1651 c2 = textio.read(2)
1652 if c2 == '':
1653 break
1654 self.assertEquals(len(c2), 2)
1655 got_lines.append(c2 + textio.readline())
1656 else:
1657 got_lines = list(textio)
1658
1659 for got_line, exp_line in zip(got_lines, exp_lines):
1660 self.assertEquals(got_line, exp_line)
1661 self.assertEquals(len(got_lines), len(exp_lines))
1662
Antoine Pitrou19690592009-06-12 20:14:08 +00001663 def test_newlines_input(self):
1664 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001665 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1666 for newline, expected in [
1667 (None, normalized.decode("ascii").splitlines(True)),
1668 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001669 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1670 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1671 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001672 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001673 buf = self.BytesIO(testdata)
1674 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001675 self.assertEquals(txt.readlines(), expected)
1676 txt.seek(0)
1677 self.assertEquals(txt.read(), "".join(expected))
1678
Antoine Pitrou19690592009-06-12 20:14:08 +00001679 def test_newlines_output(self):
1680 testdict = {
1681 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1682 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1683 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1684 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1685 }
1686 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1687 for newline, expected in tests:
1688 buf = self.BytesIO()
1689 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1690 txt.write("AAA\nB")
1691 txt.write("BB\nCCC\n")
1692 txt.write("X\rY\r\nZ")
1693 txt.flush()
1694 self.assertEquals(buf.closed, False)
1695 self.assertEquals(buf.getvalue(), expected)
1696
1697 def test_destructor(self):
1698 l = []
1699 base = self.BytesIO
1700 class MyBytesIO(base):
1701 def close(self):
1702 l.append(self.getvalue())
1703 base.close(self)
1704 b = MyBytesIO()
1705 t = self.TextIOWrapper(b, encoding="ascii")
1706 t.write("abc")
1707 del t
1708 support.gc_collect()
1709 self.assertEquals([b"abc"], l)
1710
1711 def test_override_destructor(self):
1712 record = []
1713 class MyTextIO(self.TextIOWrapper):
1714 def __del__(self):
1715 record.append(1)
1716 try:
1717 f = super(MyTextIO, self).__del__
1718 except AttributeError:
1719 pass
1720 else:
1721 f()
1722 def close(self):
1723 record.append(2)
1724 super(MyTextIO, self).close()
1725 def flush(self):
1726 record.append(3)
1727 super(MyTextIO, self).flush()
1728 b = self.BytesIO()
1729 t = MyTextIO(b, encoding="ascii")
1730 del t
1731 support.gc_collect()
1732 self.assertEqual(record, [1, 2, 3])
1733
1734 def test_error_through_destructor(self):
1735 # Test that the exception state is not modified by a destructor,
1736 # even if close() fails.
1737 rawio = self.CloseFailureIO()
1738 def f():
1739 self.TextIOWrapper(rawio).xyzzy
1740 with support.captured_output("stderr") as s:
1741 self.assertRaises(AttributeError, f)
1742 s = s.getvalue().strip()
1743 if s:
1744 # The destructor *may* have printed an unraisable error, check it
1745 self.assertEqual(len(s.splitlines()), 1)
1746 self.assert_(s.startswith("Exception IOError: "), s)
1747 self.assert_(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001748
1749 # Systematic tests of the text I/O API
1750
Antoine Pitrou19690592009-06-12 20:14:08 +00001751 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001752 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1753 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00001754 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001755 f._CHUNK_SIZE = chunksize
Antoine Pitrou19690592009-06-12 20:14:08 +00001756 self.assertEquals(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001757 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001758 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001759 f._CHUNK_SIZE = chunksize
1760 self.assertEquals(f.tell(), 0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001761 self.assertEquals(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001762 cookie = f.tell()
1763 self.assertEquals(f.seek(0), 0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001764 self.assertEquals(f.read(2), "ab")
1765 self.assertEquals(f.read(1), "c")
1766 self.assertEquals(f.read(1), "")
1767 self.assertEquals(f.read(), "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001768 self.assertEquals(f.tell(), cookie)
1769 self.assertEquals(f.seek(0), 0)
1770 self.assertEquals(f.seek(0, 2), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001771 self.assertEquals(f.write("def"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001772 self.assertEquals(f.seek(cookie), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001773 self.assertEquals(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001774 if enc.startswith("utf"):
1775 self.multi_line_test(f, enc)
1776 f.close()
1777
1778 def multi_line_test(self, f, enc):
1779 f.seek(0)
1780 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00001781 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001782 wlines = []
1783 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1784 chars = []
1785 for i in range(size):
1786 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00001787 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001788 wlines.append((f.tell(), line))
1789 f.write(line)
1790 f.seek(0)
1791 rlines = []
1792 while True:
1793 pos = f.tell()
1794 line = f.readline()
1795 if not line:
1796 break
1797 rlines.append((pos, line))
1798 self.assertEquals(rlines, wlines)
1799
Antoine Pitrou19690592009-06-12 20:14:08 +00001800 def test_telling(self):
1801 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001802 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001803 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001804 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001805 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001806 p2 = f.tell()
1807 f.seek(0)
1808 self.assertEquals(f.tell(), p0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001809 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001810 self.assertEquals(f.tell(), p1)
Antoine Pitrou19690592009-06-12 20:14:08 +00001811 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001812 self.assertEquals(f.tell(), p2)
1813 f.seek(0)
1814 for line in f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001815 self.assertEquals(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001816 self.assertRaises(IOError, f.tell)
1817 self.assertEquals(f.tell(), p2)
1818 f.close()
1819
Antoine Pitrou19690592009-06-12 20:14:08 +00001820 def test_seeking(self):
1821 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00001822 prefix_size = chunk_size - 2
1823 u_prefix = "a" * prefix_size
1824 prefix = bytes(u_prefix.encode("utf-8"))
1825 self.assertEquals(len(u_prefix), len(prefix))
1826 u_suffix = "\u8888\n"
1827 suffix = bytes(u_suffix.encode("utf-8"))
1828 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00001829 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001830 f.write(line*2)
1831 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001832 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001833 s = f.read(prefix_size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001834 self.assertEquals(s, prefix.decode("ascii"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001835 self.assertEquals(f.tell(), prefix_size)
1836 self.assertEquals(f.readline(), u_suffix)
1837
Antoine Pitrou19690592009-06-12 20:14:08 +00001838 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001839 # Regression test for a specific bug
1840 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00001841 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001842 f.write(data)
1843 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001844 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001845 f._CHUNK_SIZE # Just test that it exists
1846 f._CHUNK_SIZE = 2
1847 f.readline()
1848 f.tell()
1849
Antoine Pitrou19690592009-06-12 20:14:08 +00001850 def test_seek_and_tell(self):
1851 #Test seek/tell using the StatefulIncrementalDecoder.
1852 # Make test faster by doing smaller seeks
1853 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00001854
Antoine Pitrou19690592009-06-12 20:14:08 +00001855 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001856 """Tell/seek to various points within a data stream and ensure
1857 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00001858 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001859 f.write(data)
1860 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001861 f = self.open(support.TESTFN, encoding='test_decoder')
1862 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00001863 decoded = f.read()
1864 f.close()
1865
1866 for i in range(min_pos, len(decoded) + 1): # seek positions
1867 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00001868 f = self.open(support.TESTFN, encoding='test_decoder')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001869 self.assertEquals(f.read(i), decoded[:i])
1870 cookie = f.tell()
1871 self.assertEquals(f.read(j), decoded[i:i + j])
1872 f.seek(cookie)
1873 self.assertEquals(f.read(), decoded[i:])
1874 f.close()
1875
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001876 # Enable the test decoder.
1877 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00001878
1879 # Run the tests.
1880 try:
1881 # Try each test case.
1882 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00001883 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001884
1885 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00001886 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1887 offset = CHUNK_SIZE - len(input)//2
1888 prefix = b'.'*offset
1889 # Don't bother seeking into the prefix (takes too long).
1890 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00001891 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001892
1893 # Ensure our test decoder won't interfere with subsequent tests.
1894 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001895 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00001896
Antoine Pitrou19690592009-06-12 20:14:08 +00001897 def test_encoded_writes(self):
1898 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001899 tests = ("utf-16",
1900 "utf-16-le",
1901 "utf-16-be",
1902 "utf-32",
1903 "utf-32-le",
1904 "utf-32-be")
1905 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001906 buf = self.BytesIO()
1907 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001908 # Check if the BOM is written only once (see issue1753).
1909 f.write(data)
1910 f.write(data)
1911 f.seek(0)
1912 self.assertEquals(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00001913 f.seek(0)
1914 self.assertEquals(f.read(), data * 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001915 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1916
Antoine Pitrou19690592009-06-12 20:14:08 +00001917 def test_unreadable(self):
1918 class UnReadable(self.BytesIO):
1919 def readable(self):
1920 return False
1921 txt = self.TextIOWrapper(UnReadable())
1922 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001923
Antoine Pitrou19690592009-06-12 20:14:08 +00001924 def test_read_one_by_one(self):
1925 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001926 reads = ""
1927 while True:
1928 c = txt.read(1)
1929 if not c:
1930 break
1931 reads += c
1932 self.assertEquals(reads, "AA\nBB")
1933
1934 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00001935 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001936 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00001937 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001938 reads = ""
1939 while True:
1940 c = txt.read(128)
1941 if not c:
1942 break
1943 reads += c
1944 self.assertEquals(reads, "A"*127+"\nB")
1945
1946 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001947 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001948
1949 # read one char at a time
1950 reads = ""
1951 while True:
1952 c = txt.read(1)
1953 if not c:
1954 break
1955 reads += c
1956 self.assertEquals(reads, self.normalized)
1957
1958 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001959 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001960 txt._CHUNK_SIZE = 4
1961
1962 reads = ""
1963 while True:
1964 c = txt.read(4)
1965 if not c:
1966 break
1967 reads += c
1968 self.assertEquals(reads, self.normalized)
1969
1970 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001971 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001972 txt._CHUNK_SIZE = 4
1973
1974 reads = txt.read(4)
1975 reads += txt.read(4)
1976 reads += txt.readline()
1977 reads += txt.readline()
1978 reads += txt.readline()
1979 self.assertEquals(reads, self.normalized)
1980
1981 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001982 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001983 txt._CHUNK_SIZE = 4
1984
1985 reads = txt.read(4)
1986 reads += txt.read()
1987 self.assertEquals(reads, self.normalized)
1988
1989 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001990 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001991 txt._CHUNK_SIZE = 4
1992
1993 reads = txt.read(4)
1994 pos = txt.tell()
1995 txt.seek(0)
1996 txt.seek(pos)
1997 self.assertEquals(txt.read(4), "BBB\n")
1998
1999 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002000 buffer = self.BytesIO(self.testdata)
2001 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002002
2003 self.assertEqual(buffer.seekable(), txt.seekable())
2004
Antoine Pitrou19690592009-06-12 20:14:08 +00002005 @unittest.skip("Issue #6213 with incremental encoders")
2006 def test_append_bom(self):
2007 # The BOM is not written again when appending to a non-empty file
2008 filename = support.TESTFN
2009 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2010 with self.open(filename, 'w', encoding=charset) as f:
2011 f.write('aaa')
2012 pos = f.tell()
2013 with self.open(filename, 'rb') as f:
2014 self.assertEquals(f.read(), 'aaa'.encode(charset))
2015
2016 with self.open(filename, 'a', encoding=charset) as f:
2017 f.write('xxx')
2018 with self.open(filename, 'rb') as f:
2019 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2020
2021 @unittest.skip("Issue #6213 with incremental encoders")
2022 def test_seek_bom(self):
2023 # Same test, but when seeking manually
2024 filename = support.TESTFN
2025 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2026 with self.open(filename, 'w', encoding=charset) as f:
2027 f.write('aaa')
2028 pos = f.tell()
2029 with self.open(filename, 'r+', encoding=charset) as f:
2030 f.seek(pos)
2031 f.write('zzz')
2032 f.seek(0)
2033 f.write('bbb')
2034 with self.open(filename, 'rb') as f:
2035 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2036
2037 def test_errors_property(self):
2038 with self.open(support.TESTFN, "w") as f:
2039 self.assertEqual(f.errors, "strict")
2040 with self.open(support.TESTFN, "w", errors="replace") as f:
2041 self.assertEqual(f.errors, "replace")
2042
2043
2044class CTextIOWrapperTest(TextIOWrapperTest):
2045
2046 def test_initialization(self):
2047 r = self.BytesIO(b"\xc3\xa9\n\n")
2048 b = self.BufferedReader(r, 1000)
2049 t = self.TextIOWrapper(b)
2050 self.assertRaises(TypeError, t.__init__, b, newline=42)
2051 self.assertRaises(ValueError, t.read)
2052 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2053 self.assertRaises(ValueError, t.read)
2054
2055 def test_garbage_collection(self):
2056 # C TextIOWrapper objects are collected, and collecting them flushes
2057 # all data to disk.
2058 # The Python version has __del__, so it ends in gc.garbage instead.
2059 rawio = io.FileIO(support.TESTFN, "wb")
2060 b = self.BufferedWriter(rawio)
2061 t = self.TextIOWrapper(b, encoding="ascii")
2062 t.write("456def")
2063 t.x = t
2064 wr = weakref.ref(t)
2065 del t
2066 support.gc_collect()
2067 self.assert_(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002068 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002069 self.assertEqual(f.read(), b"456def")
2070
2071class PyTextIOWrapperTest(TextIOWrapperTest):
2072 pass
2073
2074
2075class IncrementalNewlineDecoderTest(unittest.TestCase):
2076
2077 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002078 # UTF-8 specific tests for a newline decoder
2079 def _check_decode(b, s, **kwargs):
2080 # We exercise getstate() / setstate() as well as decode()
2081 state = decoder.getstate()
2082 self.assertEquals(decoder.decode(b, **kwargs), s)
2083 decoder.setstate(state)
2084 self.assertEquals(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002085
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002086 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002087
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002088 _check_decode(b'\xe8', "")
2089 _check_decode(b'\xa2', "")
2090 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002091
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002092 _check_decode(b'\xe8', "")
2093 _check_decode(b'\xa2', "")
2094 _check_decode(b'\x88', "\u8888")
2095
2096 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002097 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2098
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002099 decoder.reset()
2100 _check_decode(b'\n', "\n")
2101 _check_decode(b'\r', "")
2102 _check_decode(b'', "\n", final=True)
2103 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002104
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002105 _check_decode(b'\r', "")
2106 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002107
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002108 _check_decode(b'\r\r\n', "\n\n")
2109 _check_decode(b'\r', "")
2110 _check_decode(b'\r', "\n")
2111 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002112
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002113 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2114 _check_decode(b'\xe8\xa2\x88', "\u8888")
2115 _check_decode(b'\n', "\n")
2116 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2117 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002118
Antoine Pitrou19690592009-06-12 20:14:08 +00002119 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002120 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002121 if encoding is not None:
2122 encoder = codecs.getincrementalencoder(encoding)()
2123 def _decode_bytewise(s):
2124 # Decode one byte at a time
2125 for b in encoder.encode(s):
2126 result.append(decoder.decode(b))
2127 else:
2128 encoder = None
2129 def _decode_bytewise(s):
2130 # Decode one char at a time
2131 for c in s:
2132 result.append(decoder.decode(c))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002133 self.assertEquals(decoder.newlines, None)
2134 _decode_bytewise("abc\n\r")
2135 self.assertEquals(decoder.newlines, '\n')
2136 _decode_bytewise("\nabc")
2137 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2138 _decode_bytewise("abc\r")
2139 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2140 _decode_bytewise("abc")
2141 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2142 _decode_bytewise("abc\r")
2143 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2144 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002145 input = "abc"
2146 if encoder is not None:
2147 encoder.reset()
2148 input = encoder.encode(input)
2149 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002150 self.assertEquals(decoder.newlines, None)
2151
2152 def test_newline_decoder(self):
2153 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002154 # None meaning the IncrementalNewlineDecoder takes unicode input
2155 # rather than bytes input
2156 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002157 'utf-16', 'utf-16-le', 'utf-16-be',
2158 'utf-32', 'utf-32-le', 'utf-32-be',
2159 )
2160 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002161 decoder = enc and codecs.getincrementaldecoder(enc)()
2162 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2163 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002164 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002165 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2166 self.check_newline_decoding_utf8(decoder)
2167
2168 def test_newline_bytes(self):
2169 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2170 def _check(dec):
2171 self.assertEquals(dec.newlines, None)
2172 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2173 self.assertEquals(dec.newlines, None)
2174 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2175 self.assertEquals(dec.newlines, None)
2176 dec = self.IncrementalNewlineDecoder(None, translate=False)
2177 _check(dec)
2178 dec = self.IncrementalNewlineDecoder(None, translate=True)
2179 _check(dec)
2180
2181class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2182 pass
2183
2184class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2185 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002186
Christian Heimes1a6387e2008-03-26 12:49:49 +00002187
2188# XXX Tests for open()
2189
2190class MiscIOTest(unittest.TestCase):
2191
Benjamin Petersonad100c32008-11-20 22:06:22 +00002192 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002193 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002194
Antoine Pitrou19690592009-06-12 20:14:08 +00002195 def test___all__(self):
2196 for name in self.io.__all__:
2197 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002198 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002199 if name == "open":
2200 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002201 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002202 self.assertTrue(issubclass(obj, Exception), name)
2203 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002204 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002205
Benjamin Petersonad100c32008-11-20 22:06:22 +00002206 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002207 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002208 self.assertEquals(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002209 f.close()
2210
Antoine Pitrou19690592009-06-12 20:14:08 +00002211 f = self.open(support.TESTFN, "U")
2212 self.assertEquals(f.name, support.TESTFN)
2213 self.assertEquals(f.buffer.name, support.TESTFN)
2214 self.assertEquals(f.buffer.raw.name, support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002215 self.assertEquals(f.mode, "U")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002216 self.assertEquals(f.buffer.mode, "rb")
2217 self.assertEquals(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002218 f.close()
2219
Antoine Pitrou19690592009-06-12 20:14:08 +00002220 f = self.open(support.TESTFN, "w+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002221 self.assertEquals(f.mode, "w+")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002222 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2223 self.assertEquals(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002224
Antoine Pitrou19690592009-06-12 20:14:08 +00002225 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002226 self.assertEquals(g.mode, "wb")
2227 self.assertEquals(g.raw.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002228 self.assertEquals(g.name, f.fileno())
2229 self.assertEquals(g.raw.name, f.fileno())
2230 f.close()
2231 g.close()
2232
Antoine Pitrou19690592009-06-12 20:14:08 +00002233 def test_io_after_close(self):
2234 for kwargs in [
2235 {"mode": "w"},
2236 {"mode": "wb"},
2237 {"mode": "w", "buffering": 1},
2238 {"mode": "w", "buffering": 2},
2239 {"mode": "wb", "buffering": 0},
2240 {"mode": "r"},
2241 {"mode": "rb"},
2242 {"mode": "r", "buffering": 1},
2243 {"mode": "r", "buffering": 2},
2244 {"mode": "rb", "buffering": 0},
2245 {"mode": "w+"},
2246 {"mode": "w+b"},
2247 {"mode": "w+", "buffering": 1},
2248 {"mode": "w+", "buffering": 2},
2249 {"mode": "w+b", "buffering": 0},
2250 ]:
2251 f = self.open(support.TESTFN, **kwargs)
2252 f.close()
2253 self.assertRaises(ValueError, f.flush)
2254 self.assertRaises(ValueError, f.fileno)
2255 self.assertRaises(ValueError, f.isatty)
2256 self.assertRaises(ValueError, f.__iter__)
2257 if hasattr(f, "peek"):
2258 self.assertRaises(ValueError, f.peek, 1)
2259 self.assertRaises(ValueError, f.read)
2260 if hasattr(f, "read1"):
2261 self.assertRaises(ValueError, f.read1, 1024)
2262 if hasattr(f, "readinto"):
2263 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2264 self.assertRaises(ValueError, f.readline)
2265 self.assertRaises(ValueError, f.readlines)
2266 self.assertRaises(ValueError, f.seek, 0)
2267 self.assertRaises(ValueError, f.tell)
2268 self.assertRaises(ValueError, f.truncate)
2269 self.assertRaises(ValueError, f.write,
2270 b"" if "b" in kwargs['mode'] else "")
2271 self.assertRaises(ValueError, f.writelines, [])
2272 self.assertRaises(ValueError, next, f)
2273
2274 def test_blockingioerror(self):
2275 # Various BlockingIOError issues
2276 self.assertRaises(TypeError, self.BlockingIOError)
2277 self.assertRaises(TypeError, self.BlockingIOError, 1)
2278 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2279 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2280 b = self.BlockingIOError(1, "")
2281 self.assertEqual(b.characters_written, 0)
2282 class C(unicode):
2283 pass
2284 c = C("")
2285 b = self.BlockingIOError(1, c)
2286 c.b = b
2287 b.c = c
2288 wr = weakref.ref(c)
2289 del c, b
2290 support.gc_collect()
2291 self.assert_(wr() is None, wr)
2292
2293 def test_abcs(self):
2294 # Test the visible base classes are ABCs.
2295 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2296 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2297 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2298 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2299
2300 def _check_abc_inheritance(self, abcmodule):
2301 with self.open(support.TESTFN, "wb", buffering=0) as f:
2302 self.assertTrue(isinstance(f, abcmodule.IOBase))
2303 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2304 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2305 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2306 with self.open(support.TESTFN, "wb") as f:
2307 self.assertTrue(isinstance(f, abcmodule.IOBase))
2308 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2309 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2310 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2311 with self.open(support.TESTFN, "w") as f:
2312 self.assertTrue(isinstance(f, abcmodule.IOBase))
2313 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2314 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2315 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2316
2317 def test_abc_inheritance(self):
2318 # Test implementations inherit from their respective ABCs
2319 self._check_abc_inheritance(self)
2320
2321 def test_abc_inheritance_official(self):
2322 # Test implementations inherit from the official ABCs of the
2323 # baseline "io" module.
2324 self._check_abc_inheritance(io)
2325
2326class CMiscIOTest(MiscIOTest):
2327 io = io
2328
2329class PyMiscIOTest(MiscIOTest):
2330 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002331
Christian Heimes1a6387e2008-03-26 12:49:49 +00002332def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00002333 tests = (CIOTest, PyIOTest,
2334 CBufferedReaderTest, PyBufferedReaderTest,
2335 CBufferedWriterTest, PyBufferedWriterTest,
2336 CBufferedRWPairTest, PyBufferedRWPairTest,
2337 CBufferedRandomTest, PyBufferedRandomTest,
2338 StatefulIncrementalDecoderTest,
2339 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2340 CTextIOWrapperTest, PyTextIOWrapperTest,
2341 CMiscIOTest, PyMiscIOTest,
2342 )
2343
2344 # Put the namespaces of the IO module we are testing and some useful mock
2345 # classes in the __dict__ of each test.
2346 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2347 MockNonBlockWriterIO)
2348 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2349 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2350 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2351 globs = globals()
2352 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2353 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2354 # Avoid turning open into a bound method.
2355 py_io_ns["open"] = pyio.OpenWrapper
2356 for test in tests:
2357 if test.__name__.startswith("C"):
2358 for name, obj in c_io_ns.items():
2359 setattr(test, name, obj)
2360 elif test.__name__.startswith("Py"):
2361 for name, obj in py_io_ns.items():
2362 setattr(test, name, obj)
2363
2364 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002365
2366if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00002367 test_main()