blob: d2ad2a6aa728ad70ad59a1e7293c360318176ef8 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Antoine Pitrou19690592009-06-12 20:14:08 +000032import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000033import signal
34import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000035from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000036from collections import deque
37from test import test_support as support
Christian Heimes1a6387e2008-03-26 12:49:49 +000038
39import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000042try:
43 import threading
44except ImportError:
45 threading = None
Antoine Pitrou19690592009-06-12 20:14:08 +000046
47__metaclass__ = type
48bytes = support.py3k_bytes
49
50def _default_chunk_size():
51 """Get the default TextIOWrapper chunk size"""
52 with io.open(__file__, "r", encoding="latin1") as f:
53 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000054
55
Antoine Pitrou19690592009-06-12 20:14:08 +000056class MockRawIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +000057
58 def __init__(self, read_stack=()):
59 self._read_stack = list(read_stack)
60 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000061 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000062 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000063
64 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +000065 self._reads += 1
Christian Heimes1a6387e2008-03-26 12:49:49 +000066 try:
67 return self._read_stack.pop(0)
68 except:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000069 self._extraneous_reads += 1
Christian Heimes1a6387e2008-03-26 12:49:49 +000070 return b""
71
72 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000073 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000074 return len(b)
75
76 def writable(self):
77 return True
78
79 def fileno(self):
80 return 42
81
82 def readable(self):
83 return True
84
85 def seekable(self):
86 return True
87
88 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000089 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000090
91 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000092 return 0 # same comment as above
93
94 def readinto(self, buf):
95 self._reads += 1
96 max_len = len(buf)
97 try:
98 data = self._read_stack[0]
99 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000100 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +0000101 return 0
102 if data is None:
103 del self._read_stack[0]
104 return None
105 n = len(data)
106 if len(data) <= max_len:
107 del self._read_stack[0]
108 buf[:n] = data
109 return n
110 else:
111 buf[:] = data[:max_len]
112 self._read_stack[0] = data[max_len:]
113 return max_len
114
115 def truncate(self, pos=None):
116 return pos
117
118class CMockRawIO(MockRawIO, io.RawIOBase):
119 pass
120
121class PyMockRawIO(MockRawIO, pyio.RawIOBase):
122 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000123
124
Antoine Pitrou19690592009-06-12 20:14:08 +0000125class MisbehavedRawIO(MockRawIO):
126 def write(self, b):
127 return MockRawIO.write(self, b) * 2
128
129 def read(self, n=None):
130 return MockRawIO.read(self, n) * 2
131
132 def seek(self, pos, whence):
133 return -123
134
135 def tell(self):
136 return -456
137
138 def readinto(self, buf):
139 MockRawIO.readinto(self, buf)
140 return len(buf) * 5
141
142class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
143 pass
144
145class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
146 pass
147
148
149class CloseFailureIO(MockRawIO):
150 closed = 0
151
152 def close(self):
153 if not self.closed:
154 self.closed = 1
155 raise IOError
156
157class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
158 pass
159
160class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
161 pass
162
163
164class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000165
166 def __init__(self, data):
167 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000168 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000169
170 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000171 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000172 self.read_history.append(None if res is None else len(res))
173 return res
174
Antoine Pitrou19690592009-06-12 20:14:08 +0000175 def readinto(self, b):
176 res = super(MockFileIO, self).readinto(b)
177 self.read_history.append(res)
178 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000179
Antoine Pitrou19690592009-06-12 20:14:08 +0000180class CMockFileIO(MockFileIO, io.BytesIO):
181 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000182
Antoine Pitrou19690592009-06-12 20:14:08 +0000183class PyMockFileIO(MockFileIO, pyio.BytesIO):
184 pass
185
186
187class MockNonBlockWriterIO:
188
189 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000191 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000192
Antoine Pitrou19690592009-06-12 20:14:08 +0000193 def pop_written(self):
194 s = b"".join(self._write_stack)
195 self._write_stack[:] = []
196 return s
197
198 def block_on(self, char):
199 """Block when a given char is encountered."""
200 self._blocker_char = char
201
202 def readable(self):
203 return True
204
205 def seekable(self):
206 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000207
208 def writable(self):
209 return True
210
Antoine Pitrou19690592009-06-12 20:14:08 +0000211 def write(self, b):
212 b = bytes(b)
213 n = -1
214 if self._blocker_char:
215 try:
216 n = b.index(self._blocker_char)
217 except ValueError:
218 pass
219 else:
220 self._blocker_char = None
221 self._write_stack.append(b[:n])
222 raise self.BlockingIOError(0, "test blocking", n)
223 self._write_stack.append(b)
224 return len(b)
225
226class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
227 BlockingIOError = io.BlockingIOError
228
229class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
230 BlockingIOError = pyio.BlockingIOError
231
Christian Heimes1a6387e2008-03-26 12:49:49 +0000232
233class IOTest(unittest.TestCase):
234
Antoine Pitrou19690592009-06-12 20:14:08 +0000235 def setUp(self):
236 support.unlink(support.TESTFN)
237
Christian Heimes1a6387e2008-03-26 12:49:49 +0000238 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000239 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000240
241 def write_ops(self, f):
242 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000243 f.truncate(0)
244 self.assertEqual(f.tell(), 5)
245 f.seek(0)
246
247 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000248 self.assertEqual(f.seek(0), 0)
249 self.assertEqual(f.write(b"Hello."), 6)
250 self.assertEqual(f.tell(), 6)
251 self.assertEqual(f.seek(-1, 1), 5)
252 self.assertEqual(f.tell(), 5)
253 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
254 self.assertEqual(f.seek(0), 0)
255 self.assertEqual(f.write(b"h"), 1)
256 self.assertEqual(f.seek(-1, 2), 13)
257 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000258
Christian Heimes1a6387e2008-03-26 12:49:49 +0000259 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000260 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000261 self.assertRaises(TypeError, f.seek, 0.0)
262
263 def read_ops(self, f, buffered=False):
264 data = f.read(5)
265 self.assertEqual(data, b"hello")
266 data = bytearray(data)
267 self.assertEqual(f.readinto(data), 5)
268 self.assertEqual(data, b" worl")
269 self.assertEqual(f.readinto(data), 2)
270 self.assertEqual(len(data), 5)
271 self.assertEqual(data[:2], b"d\n")
272 self.assertEqual(f.seek(0), 0)
273 self.assertEqual(f.read(20), b"hello world\n")
274 self.assertEqual(f.read(1), b"")
275 self.assertEqual(f.readinto(bytearray(b"x")), 0)
276 self.assertEqual(f.seek(-6, 2), 6)
277 self.assertEqual(f.read(5), b"world")
278 self.assertEqual(f.read(0), b"")
279 self.assertEqual(f.readinto(bytearray()), 0)
280 self.assertEqual(f.seek(-6, 1), 5)
281 self.assertEqual(f.read(5), b" worl")
282 self.assertEqual(f.tell(), 10)
283 self.assertRaises(TypeError, f.seek, 0.0)
284 if buffered:
285 f.seek(0)
286 self.assertEqual(f.read(), b"hello world\n")
287 f.seek(6)
288 self.assertEqual(f.read(), b"world\n")
289 self.assertEqual(f.read(), b"")
290
291 LARGE = 2**31
292
293 def large_file_ops(self, f):
294 assert f.readable()
295 assert f.writable()
296 self.assertEqual(f.seek(self.LARGE), self.LARGE)
297 self.assertEqual(f.tell(), self.LARGE)
298 self.assertEqual(f.write(b"xxx"), 3)
299 self.assertEqual(f.tell(), self.LARGE + 3)
300 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
301 self.assertEqual(f.truncate(), self.LARGE + 2)
302 self.assertEqual(f.tell(), self.LARGE + 2)
303 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
304 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000305 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000306 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
307 self.assertEqual(f.seek(-1, 2), self.LARGE)
308 self.assertEqual(f.read(2), b"x")
309
Antoine Pitrou19690592009-06-12 20:14:08 +0000310 def test_invalid_operations(self):
311 # Try writing on a file opened in read mode and vice-versa.
312 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000313 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000314 self.assertRaises(IOError, fp.read)
315 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000316 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000317 self.assertRaises(IOError, fp.write, b"blah")
318 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000319 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000320 self.assertRaises(IOError, fp.write, "blah")
321 self.assertRaises(IOError, fp.writelines, ["blah\n"])
322
Christian Heimes1a6387e2008-03-26 12:49:49 +0000323 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000324 with self.open(support.TESTFN, "wb", buffering=0) as f:
325 self.assertEqual(f.readable(), False)
326 self.assertEqual(f.writable(), True)
327 self.assertEqual(f.seekable(), True)
328 self.write_ops(f)
329 with self.open(support.TESTFN, "rb", buffering=0) as f:
330 self.assertEqual(f.readable(), True)
331 self.assertEqual(f.writable(), False)
332 self.assertEqual(f.seekable(), True)
333 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000334
335 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000336 with self.open(support.TESTFN, "wb") as f:
337 self.assertEqual(f.readable(), False)
338 self.assertEqual(f.writable(), True)
339 self.assertEqual(f.seekable(), True)
340 self.write_ops(f)
341 with self.open(support.TESTFN, "rb") as f:
342 self.assertEqual(f.readable(), True)
343 self.assertEqual(f.writable(), False)
344 self.assertEqual(f.seekable(), True)
345 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000346
347 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000348 with self.open(support.TESTFN, "wb") as f:
349 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
350 with self.open(support.TESTFN, "rb") as f:
351 self.assertEqual(f.readline(), b"abc\n")
352 self.assertEqual(f.readline(10), b"def\n")
353 self.assertEqual(f.readline(2), b"xy")
354 self.assertEqual(f.readline(4), b"zzy\n")
355 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000356 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000357 self.assertRaises(TypeError, f.readline, 5.3)
358 with self.open(support.TESTFN, "r") as f:
359 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000360
361 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000362 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000363 self.write_ops(f)
364 data = f.getvalue()
365 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000366 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000367 self.read_ops(f, True)
368
369 def test_large_file_ops(self):
370 # On Windows and Mac OSX this test comsumes large resources; It takes
371 # a long time to build the >2GB file and takes >2GB of disk space
372 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000373 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
374 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000375 print("\nTesting large file ops skipped on %s." % sys.platform,
376 file=sys.stderr)
377 print("It requires %d bytes and a long time." % self.LARGE,
378 file=sys.stderr)
379 print("Use 'regrtest.py -u largefile test_io' to run it.",
380 file=sys.stderr)
381 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000382 with self.open(support.TESTFN, "w+b", 0) as f:
383 self.large_file_ops(f)
384 with self.open(support.TESTFN, "w+b") as f:
385 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000386
387 def test_with_open(self):
388 for bufsize in (0, 1, 100):
389 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000390 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000391 f.write(b"xxx")
392 self.assertEqual(f.closed, True)
393 f = None
394 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000395 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000396 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000397 except ZeroDivisionError:
398 self.assertEqual(f.closed, True)
399 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000400 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000401
Antoine Pitroue741cc62009-01-21 00:45:36 +0000402 # issue 5008
403 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000404 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000405 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000406 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000407 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000408 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000409 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000410 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000411 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000412
Christian Heimes1a6387e2008-03-26 12:49:49 +0000413 def test_destructor(self):
414 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000415 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000416 def __del__(self):
417 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000418 try:
419 f = super(MyFileIO, self).__del__
420 except AttributeError:
421 pass
422 else:
423 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000424 def close(self):
425 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000427 def flush(self):
428 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000429 super(MyFileIO, self).flush()
430 f = MyFileIO(support.TESTFN, "wb")
431 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000432 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000433 support.gc_collect()
434 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000435 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000436 self.assertEqual(f.read(), b"xxx")
437
438 def _check_base_destructor(self, base):
439 record = []
440 class MyIO(base):
441 def __init__(self):
442 # This exercises the availability of attributes on object
443 # destruction.
444 # (in the C version, close() is called by the tp_dealloc
445 # function, not by __del__)
446 self.on_del = 1
447 self.on_close = 2
448 self.on_flush = 3
449 def __del__(self):
450 record.append(self.on_del)
451 try:
452 f = super(MyIO, self).__del__
453 except AttributeError:
454 pass
455 else:
456 f()
457 def close(self):
458 record.append(self.on_close)
459 super(MyIO, self).close()
460 def flush(self):
461 record.append(self.on_flush)
462 super(MyIO, self).flush()
463 f = MyIO()
464 del f
465 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000466 self.assertEqual(record, [1, 2, 3])
467
Antoine Pitrou19690592009-06-12 20:14:08 +0000468 def test_IOBase_destructor(self):
469 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000470
Antoine Pitrou19690592009-06-12 20:14:08 +0000471 def test_RawIOBase_destructor(self):
472 self._check_base_destructor(self.RawIOBase)
473
474 def test_BufferedIOBase_destructor(self):
475 self._check_base_destructor(self.BufferedIOBase)
476
477 def test_TextIOBase_destructor(self):
478 self._check_base_destructor(self.TextIOBase)
479
480 def test_close_flushes(self):
481 with self.open(support.TESTFN, "wb") as f:
482 f.write(b"xxx")
483 with self.open(support.TESTFN, "rb") as f:
484 self.assertEqual(f.read(), b"xxx")
485
486 def test_array_writes(self):
487 a = array.array(b'i', range(10))
488 n = len(a.tostring())
489 with self.open(support.TESTFN, "wb", 0) as f:
490 self.assertEqual(f.write(a), n)
491 with self.open(support.TESTFN, "wb") as f:
492 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000493
494 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000495 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000496 closefd=False)
497
Antoine Pitrou19690592009-06-12 20:14:08 +0000498 def test_read_closed(self):
499 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000500 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000501 with self.open(support.TESTFN, "r") as f:
502 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000503 self.assertEqual(file.read(), "egg\n")
504 file.seek(0)
505 file.close()
506 self.assertRaises(ValueError, file.read)
507
508 def test_no_closefd_with_filename(self):
509 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000510 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000511
512 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000513 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000514 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000515 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000516 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000517 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000518 self.assertEqual(file.buffer.raw.closefd, False)
519
Antoine Pitrou19690592009-06-12 20:14:08 +0000520 def test_garbage_collection(self):
521 # FileIO objects are collected, and collecting them flushes
522 # all data to disk.
523 f = self.FileIO(support.TESTFN, "wb")
524 f.write(b"abcxxx")
525 f.f = f
526 wr = weakref.ref(f)
527 del f
528 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000529 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000530 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000532
Antoine Pitrou19690592009-06-12 20:14:08 +0000533 def test_unbounded_file(self):
534 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
535 zero = "/dev/zero"
536 if not os.path.exists(zero):
537 self.skipTest("{0} does not exist".format(zero))
538 if sys.maxsize > 0x7FFFFFFF:
539 self.skipTest("test can only run in a 32-bit address space")
540 if support.real_max_memuse < support._2G:
541 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000542 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000543 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000544 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000545 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000546 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000547 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000548
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000549 def test_flush_error_on_close(self):
550 f = self.open(support.TESTFN, "wb", buffering=0)
551 def bad_flush():
552 raise IOError()
553 f.flush = bad_flush
554 self.assertRaises(IOError, f.close) # exception not swallowed
555
556 def test_multi_close(self):
557 f = self.open(support.TESTFN, "wb", buffering=0)
558 f.close()
559 f.close()
560 f.close()
561 self.assertRaises(ValueError, f.flush)
562
Antoine Pitrou19690592009-06-12 20:14:08 +0000563class CIOTest(IOTest):
564 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000565
Antoine Pitrou19690592009-06-12 20:14:08 +0000566class PyIOTest(IOTest):
567 test_array_writes = unittest.skip(
568 "len(array.array) returns number of elements rather than bytelength"
569 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000570
571
Antoine Pitrou19690592009-06-12 20:14:08 +0000572class CommonBufferedTests:
573 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
574
575 def test_detach(self):
576 raw = self.MockRawIO()
577 buf = self.tp(raw)
578 self.assertIs(buf.detach(), raw)
579 self.assertRaises(ValueError, buf.detach)
580
581 def test_fileno(self):
582 rawio = self.MockRawIO()
583 bufio = self.tp(rawio)
584
585 self.assertEquals(42, bufio.fileno())
586
587 def test_no_fileno(self):
588 # XXX will we always have fileno() function? If so, kill
589 # this test. Else, write it.
590 pass
591
592 def test_invalid_args(self):
593 rawio = self.MockRawIO()
594 bufio = self.tp(rawio)
595 # Invalid whence
596 self.assertRaises(ValueError, bufio.seek, 0, -1)
597 self.assertRaises(ValueError, bufio.seek, 0, 3)
598
599 def test_override_destructor(self):
600 tp = self.tp
601 record = []
602 class MyBufferedIO(tp):
603 def __del__(self):
604 record.append(1)
605 try:
606 f = super(MyBufferedIO, self).__del__
607 except AttributeError:
608 pass
609 else:
610 f()
611 def close(self):
612 record.append(2)
613 super(MyBufferedIO, self).close()
614 def flush(self):
615 record.append(3)
616 super(MyBufferedIO, self).flush()
617 rawio = self.MockRawIO()
618 bufio = MyBufferedIO(rawio)
619 writable = bufio.writable()
620 del bufio
621 support.gc_collect()
622 if writable:
623 self.assertEqual(record, [1, 2, 3])
624 else:
625 self.assertEqual(record, [1, 2])
626
627 def test_context_manager(self):
628 # Test usability as a context manager
629 rawio = self.MockRawIO()
630 bufio = self.tp(rawio)
631 def _with():
632 with bufio:
633 pass
634 _with()
635 # bufio should now be closed, and using it a second time should raise
636 # a ValueError.
637 self.assertRaises(ValueError, _with)
638
639 def test_error_through_destructor(self):
640 # Test that the exception state is not modified by a destructor,
641 # even if close() fails.
642 rawio = self.CloseFailureIO()
643 def f():
644 self.tp(rawio).xyzzy
645 with support.captured_output("stderr") as s:
646 self.assertRaises(AttributeError, f)
647 s = s.getvalue().strip()
648 if s:
649 # The destructor *may* have printed an unraisable error, check it
650 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000651 self.assertTrue(s.startswith("Exception IOError: "), s)
652 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000653
654 def test_repr(self):
655 raw = self.MockRawIO()
656 b = self.tp(raw)
657 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
658 self.assertEqual(repr(b), "<%s>" % clsname)
659 raw.name = "dummy"
660 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
661 raw.name = b"dummy"
662 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000663
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000664 def test_flush_error_on_close(self):
665 raw = self.MockRawIO()
666 def bad_flush():
667 raise IOError()
668 raw.flush = bad_flush
669 b = self.tp(raw)
670 self.assertRaises(IOError, b.close) # exception not swallowed
671
672 def test_multi_close(self):
673 raw = self.MockRawIO()
674 b = self.tp(raw)
675 b.close()
676 b.close()
677 b.close()
678 self.assertRaises(ValueError, b.flush)
679
Christian Heimes1a6387e2008-03-26 12:49:49 +0000680
Antoine Pitrou19690592009-06-12 20:14:08 +0000681class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
682 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000683
Antoine Pitrou19690592009-06-12 20:14:08 +0000684 def test_constructor(self):
685 rawio = self.MockRawIO([b"abc"])
686 bufio = self.tp(rawio)
687 bufio.__init__(rawio)
688 bufio.__init__(rawio, buffer_size=1024)
689 bufio.__init__(rawio, buffer_size=16)
690 self.assertEquals(b"abc", bufio.read())
691 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
692 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
693 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
694 rawio = self.MockRawIO([b"abc"])
695 bufio.__init__(rawio)
696 self.assertEquals(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000697
Antoine Pitrou19690592009-06-12 20:14:08 +0000698 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000699 for arg in (None, 7):
700 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
701 bufio = self.tp(rawio)
702 self.assertEquals(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000703 # Invalid args
704 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000705
Antoine Pitrou19690592009-06-12 20:14:08 +0000706 def test_read1(self):
707 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
708 bufio = self.tp(rawio)
709 self.assertEquals(b"a", bufio.read(1))
710 self.assertEquals(b"b", bufio.read1(1))
711 self.assertEquals(rawio._reads, 1)
712 self.assertEquals(b"c", bufio.read1(100))
713 self.assertEquals(rawio._reads, 1)
714 self.assertEquals(b"d", bufio.read1(100))
715 self.assertEquals(rawio._reads, 2)
716 self.assertEquals(b"efg", bufio.read1(100))
717 self.assertEquals(rawio._reads, 3)
718 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000719 self.assertEquals(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000720 # Invalid args
721 self.assertRaises(ValueError, bufio.read1, -1)
722
723 def test_readinto(self):
724 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
725 bufio = self.tp(rawio)
726 b = bytearray(2)
727 self.assertEquals(bufio.readinto(b), 2)
728 self.assertEquals(b, b"ab")
729 self.assertEquals(bufio.readinto(b), 2)
730 self.assertEquals(b, b"cd")
731 self.assertEquals(bufio.readinto(b), 2)
732 self.assertEquals(b, b"ef")
733 self.assertEquals(bufio.readinto(b), 1)
734 self.assertEquals(b, b"gf")
735 self.assertEquals(bufio.readinto(b), 0)
736 self.assertEquals(b, b"gf")
737
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000738 def test_readlines(self):
739 def bufio():
740 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
741 return self.tp(rawio)
742 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
743 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
744 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
745
Antoine Pitrou19690592009-06-12 20:14:08 +0000746 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000747 data = b"abcdefghi"
748 dlen = len(data)
749
750 tests = [
751 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
752 [ 100, [ 3, 3, 3], [ dlen ] ],
753 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
754 ]
755
756 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000757 rawio = self.MockFileIO(data)
758 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000759 pos = 0
760 for nbytes in buf_read_sizes:
761 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
762 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000763 # this is mildly implementation-dependent
Christian Heimes1a6387e2008-03-26 12:49:49 +0000764 self.assertEquals(rawio.read_history, raw_read_sizes)
765
Antoine Pitrou19690592009-06-12 20:14:08 +0000766 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000767 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000768 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
769 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000770
771 self.assertEquals(b"abcd", bufio.read(6))
772 self.assertEquals(b"e", bufio.read(1))
773 self.assertEquals(b"fg", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000774 self.assertEquals(b"", bufio.peek(1))
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000775 self.assertTrue(None is bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000776 self.assertEquals(b"", bufio.read())
777
Antoine Pitrou19690592009-06-12 20:14:08 +0000778 def test_read_past_eof(self):
779 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
780 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000781
782 self.assertEquals(b"abcdefg", bufio.read(9000))
783
Antoine Pitrou19690592009-06-12 20:14:08 +0000784 def test_read_all(self):
785 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
786 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000787
788 self.assertEquals(b"abcdefg", bufio.read())
789
Victor Stinner6a102812010-04-27 23:55:59 +0000790 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou19690592009-06-12 20:14:08 +0000791 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000792 try:
793 # Write out many bytes with exactly the same number of 0's,
794 # 1's... 255's. This will help us check that concurrent reading
795 # doesn't duplicate or forget contents.
796 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000797 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000798 random.shuffle(l)
799 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000800 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000801 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000802 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000803 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000804 errors = []
805 results = []
806 def f():
807 try:
808 # Intra-buffer read then buffer-flushing read
809 for n in cycle([1, 19]):
810 s = bufio.read(n)
811 if not s:
812 break
813 # list.append() is atomic
814 results.append(s)
815 except Exception as e:
816 errors.append(e)
817 raise
818 threads = [threading.Thread(target=f) for x in range(20)]
819 for t in threads:
820 t.start()
821 time.sleep(0.02) # yield
822 for t in threads:
823 t.join()
824 self.assertFalse(errors,
825 "the following exceptions were caught: %r" % errors)
826 s = b''.join(results)
827 for i in range(256):
828 c = bytes(bytearray([i]))
829 self.assertEqual(s.count(c), N)
830 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000831 support.unlink(support.TESTFN)
832
833 def test_misbehaved_io(self):
834 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
835 bufio = self.tp(rawio)
836 self.assertRaises(IOError, bufio.seek, 0)
837 self.assertRaises(IOError, bufio.tell)
838
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000839 def test_no_extraneous_read(self):
840 # Issue #9550; when the raw IO object has satisfied the read request,
841 # we should not issue any additional reads, otherwise it may block
842 # (e.g. socket).
843 bufsize = 16
844 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
845 rawio = self.MockRawIO([b"x" * n])
846 bufio = self.tp(rawio, bufsize)
847 self.assertEqual(bufio.read(n), b"x" * n)
848 # Simple case: one raw read is enough to satisfy the request.
849 self.assertEqual(rawio._extraneous_reads, 0,
850 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
851 # A more complex case where two raw reads are needed to satisfy
852 # the request.
853 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
854 bufio = self.tp(rawio, bufsize)
855 self.assertEqual(bufio.read(n), b"x" * n)
856 self.assertEqual(rawio._extraneous_reads, 0,
857 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
858
859
Antoine Pitrou19690592009-06-12 20:14:08 +0000860class CBufferedReaderTest(BufferedReaderTest):
861 tp = io.BufferedReader
862
863 def test_constructor(self):
864 BufferedReaderTest.test_constructor(self)
865 # The allocation can succeed on 32-bit builds, e.g. with more
866 # than 2GB RAM and a 64-bit kernel.
867 if sys.maxsize > 0x7FFFFFFF:
868 rawio = self.MockRawIO()
869 bufio = self.tp(rawio)
870 self.assertRaises((OverflowError, MemoryError, ValueError),
871 bufio.__init__, rawio, sys.maxsize)
872
873 def test_initialization(self):
874 rawio = self.MockRawIO([b"abc"])
875 bufio = self.tp(rawio)
876 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
877 self.assertRaises(ValueError, bufio.read)
878 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
879 self.assertRaises(ValueError, bufio.read)
880 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
881 self.assertRaises(ValueError, bufio.read)
882
883 def test_misbehaved_io_read(self):
884 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
885 bufio = self.tp(rawio)
886 # _pyio.BufferedReader seems to implement reading different, so that
887 # checking this is not so easy.
888 self.assertRaises(IOError, bufio.read, 10)
889
890 def test_garbage_collection(self):
891 # C BufferedReader objects are collected.
892 # The Python version has __del__, so it ends into gc.garbage instead
893 rawio = self.FileIO(support.TESTFN, "w+b")
894 f = self.tp(rawio)
895 f.f = f
896 wr = weakref.ref(f)
897 del f
898 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000899 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +0000900
901class PyBufferedReaderTest(BufferedReaderTest):
902 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000903
904
Antoine Pitrou19690592009-06-12 20:14:08 +0000905class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
906 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000907
Antoine Pitrou19690592009-06-12 20:14:08 +0000908 def test_constructor(self):
909 rawio = self.MockRawIO()
910 bufio = self.tp(rawio)
911 bufio.__init__(rawio)
912 bufio.__init__(rawio, buffer_size=1024)
913 bufio.__init__(rawio, buffer_size=16)
914 self.assertEquals(3, bufio.write(b"abc"))
915 bufio.flush()
916 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
917 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
918 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
919 bufio.__init__(rawio)
920 self.assertEquals(3, bufio.write(b"ghi"))
921 bufio.flush()
922 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000923
Antoine Pitrou19690592009-06-12 20:14:08 +0000924 def test_detach_flush(self):
925 raw = self.MockRawIO()
926 buf = self.tp(raw)
927 buf.write(b"howdy!")
928 self.assertFalse(raw._write_stack)
929 buf.detach()
930 self.assertEqual(raw._write_stack, [b"howdy!"])
931
932 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000933 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +0000934 writer = self.MockRawIO()
935 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000936 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000937 self.assertFalse(writer._write_stack)
938
Antoine Pitrou19690592009-06-12 20:14:08 +0000939 def test_write_overflow(self):
940 writer = self.MockRawIO()
941 bufio = self.tp(writer, 8)
942 contents = b"abcdefghijklmnop"
943 for n in range(0, len(contents), 3):
944 bufio.write(contents[n:n+3])
945 flushed = b"".join(writer._write_stack)
946 # At least (total - 8) bytes were implicitly flushed, perhaps more
947 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000948 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000949
Antoine Pitrou19690592009-06-12 20:14:08 +0000950 def check_writes(self, intermediate_func):
951 # Lots of writes, test the flushed output is as expected.
952 contents = bytes(range(256)) * 1000
953 n = 0
954 writer = self.MockRawIO()
955 bufio = self.tp(writer, 13)
956 # Generator of write sizes: repeat each N 15 times then proceed to N+1
957 def gen_sizes():
958 for size in count(1):
959 for i in range(15):
960 yield size
961 sizes = gen_sizes()
962 while n < len(contents):
963 size = min(next(sizes), len(contents) - n)
964 self.assertEquals(bufio.write(contents[n:n+size]), size)
965 intermediate_func(bufio)
966 n += size
967 bufio.flush()
968 self.assertEquals(contents,
969 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000970
Antoine Pitrou19690592009-06-12 20:14:08 +0000971 def test_writes(self):
972 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000973
Antoine Pitrou19690592009-06-12 20:14:08 +0000974 def test_writes_and_flushes(self):
975 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000976
Antoine Pitrou19690592009-06-12 20:14:08 +0000977 def test_writes_and_seeks(self):
978 def _seekabs(bufio):
979 pos = bufio.tell()
980 bufio.seek(pos + 1, 0)
981 bufio.seek(pos - 1, 0)
982 bufio.seek(pos, 0)
983 self.check_writes(_seekabs)
984 def _seekrel(bufio):
985 pos = bufio.seek(0, 1)
986 bufio.seek(+1, 1)
987 bufio.seek(-1, 1)
988 bufio.seek(pos, 0)
989 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000990
Antoine Pitrou19690592009-06-12 20:14:08 +0000991 def test_writes_and_truncates(self):
992 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000993
Antoine Pitrou19690592009-06-12 20:14:08 +0000994 def test_write_non_blocking(self):
995 raw = self.MockNonBlockWriterIO()
996 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000997
Antoine Pitrou19690592009-06-12 20:14:08 +0000998 self.assertEquals(bufio.write(b"abcd"), 4)
999 self.assertEquals(bufio.write(b"efghi"), 5)
1000 # 1 byte will be written, the rest will be buffered
1001 raw.block_on(b"k")
1002 self.assertEquals(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001003
Antoine Pitrou19690592009-06-12 20:14:08 +00001004 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1005 raw.block_on(b"0")
1006 try:
1007 bufio.write(b"opqrwxyz0123456789")
1008 except self.BlockingIOError as e:
1009 written = e.characters_written
1010 else:
1011 self.fail("BlockingIOError should have been raised")
1012 self.assertEquals(written, 16)
1013 self.assertEquals(raw.pop_written(),
1014 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001015
Antoine Pitrou19690592009-06-12 20:14:08 +00001016 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
1017 s = raw.pop_written()
1018 # Previously buffered bytes were flushed
1019 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001020
Antoine Pitrou19690592009-06-12 20:14:08 +00001021 def test_write_and_rewind(self):
1022 raw = io.BytesIO()
1023 bufio = self.tp(raw, 4)
1024 self.assertEqual(bufio.write(b"abcdef"), 6)
1025 self.assertEqual(bufio.tell(), 6)
1026 bufio.seek(0, 0)
1027 self.assertEqual(bufio.write(b"XY"), 2)
1028 bufio.seek(6, 0)
1029 self.assertEqual(raw.getvalue(), b"XYcdef")
1030 self.assertEqual(bufio.write(b"123456"), 6)
1031 bufio.flush()
1032 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001033
Antoine Pitrou19690592009-06-12 20:14:08 +00001034 def test_flush(self):
1035 writer = self.MockRawIO()
1036 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001037 bufio.write(b"abc")
1038 bufio.flush()
Christian Heimes1a6387e2008-03-26 12:49:49 +00001039 self.assertEquals(b"abc", writer._write_stack[0])
1040
Antoine Pitrou19690592009-06-12 20:14:08 +00001041 def test_destructor(self):
1042 writer = self.MockRawIO()
1043 bufio = self.tp(writer, 8)
1044 bufio.write(b"abc")
1045 del bufio
1046 support.gc_collect()
1047 self.assertEquals(b"abc", writer._write_stack[0])
1048
1049 def test_truncate(self):
1050 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001051 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001052 bufio = self.tp(raw, 8)
1053 bufio.write(b"abcdef")
1054 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001055 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001056 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001057 self.assertEqual(f.read(), b"abc")
1058
Victor Stinner6a102812010-04-27 23:55:59 +00001059 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou19690592009-06-12 20:14:08 +00001060 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001061 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001062 # Write out many bytes from many threads and test they were
1063 # all flushed.
1064 N = 1000
1065 contents = bytes(range(256)) * N
1066 sizes = cycle([1, 19])
1067 n = 0
1068 queue = deque()
1069 while n < len(contents):
1070 size = next(sizes)
1071 queue.append(contents[n:n+size])
1072 n += size
1073 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001074 # We use a real file object because it allows us to
1075 # exercise situations where the GIL is released before
1076 # writing the buffer to the raw streams. This is in addition
1077 # to concurrency issues due to switching threads in the middle
1078 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001079 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001080 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001081 errors = []
1082 def f():
1083 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001084 while True:
1085 try:
1086 s = queue.popleft()
1087 except IndexError:
1088 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001089 bufio.write(s)
1090 except Exception as e:
1091 errors.append(e)
1092 raise
1093 threads = [threading.Thread(target=f) for x in range(20)]
1094 for t in threads:
1095 t.start()
1096 time.sleep(0.02) # yield
1097 for t in threads:
1098 t.join()
1099 self.assertFalse(errors,
1100 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001101 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001102 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001103 s = f.read()
1104 for i in range(256):
1105 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001106 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001107 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001108
Antoine Pitrou19690592009-06-12 20:14:08 +00001109 def test_misbehaved_io(self):
1110 rawio = self.MisbehavedRawIO()
1111 bufio = self.tp(rawio, 5)
1112 self.assertRaises(IOError, bufio.seek, 0)
1113 self.assertRaises(IOError, bufio.tell)
1114 self.assertRaises(IOError, bufio.write, b"abcdef")
1115
1116 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001117 with support.check_warnings(("max_buffer_size is deprecated",
1118 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001119 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001120
1121
1122class CBufferedWriterTest(BufferedWriterTest):
1123 tp = io.BufferedWriter
1124
1125 def test_constructor(self):
1126 BufferedWriterTest.test_constructor(self)
1127 # The allocation can succeed on 32-bit builds, e.g. with more
1128 # than 2GB RAM and a 64-bit kernel.
1129 if sys.maxsize > 0x7FFFFFFF:
1130 rawio = self.MockRawIO()
1131 bufio = self.tp(rawio)
1132 self.assertRaises((OverflowError, MemoryError, ValueError),
1133 bufio.__init__, rawio, sys.maxsize)
1134
1135 def test_initialization(self):
1136 rawio = self.MockRawIO()
1137 bufio = self.tp(rawio)
1138 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1139 self.assertRaises(ValueError, bufio.write, b"def")
1140 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1141 self.assertRaises(ValueError, bufio.write, b"def")
1142 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1143 self.assertRaises(ValueError, bufio.write, b"def")
1144
1145 def test_garbage_collection(self):
1146 # C BufferedWriter objects are collected, and collecting them flushes
1147 # all data to disk.
1148 # The Python version has __del__, so it ends into gc.garbage instead
1149 rawio = self.FileIO(support.TESTFN, "w+b")
1150 f = self.tp(rawio)
1151 f.write(b"123xxx")
1152 f.x = f
1153 wr = weakref.ref(f)
1154 del f
1155 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001156 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001157 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001158 self.assertEqual(f.read(), b"123xxx")
1159
1160
1161class PyBufferedWriterTest(BufferedWriterTest):
1162 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001163
1164class BufferedRWPairTest(unittest.TestCase):
1165
Antoine Pitrou19690592009-06-12 20:14:08 +00001166 def test_constructor(self):
1167 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001168 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001169
Antoine Pitrou19690592009-06-12 20:14:08 +00001170 def test_detach(self):
1171 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1172 self.assertRaises(self.UnsupportedOperation, pair.detach)
1173
1174 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001175 with support.check_warnings(("max_buffer_size is deprecated",
1176 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001177 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001178
1179 def test_constructor_with_not_readable(self):
1180 class NotReadable(MockRawIO):
1181 def readable(self):
1182 return False
1183
1184 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1185
1186 def test_constructor_with_not_writeable(self):
1187 class NotWriteable(MockRawIO):
1188 def writable(self):
1189 return False
1190
1191 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1192
1193 def test_read(self):
1194 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1195
1196 self.assertEqual(pair.read(3), b"abc")
1197 self.assertEqual(pair.read(1), b"d")
1198 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001199 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1200 self.assertEqual(pair.read(None), b"abc")
1201
1202 def test_readlines(self):
1203 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1204 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1205 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1206 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001207
1208 def test_read1(self):
1209 # .read1() is delegated to the underlying reader object, so this test
1210 # can be shallow.
1211 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1212
1213 self.assertEqual(pair.read1(3), b"abc")
1214
1215 def test_readinto(self):
1216 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1217
1218 data = bytearray(5)
1219 self.assertEqual(pair.readinto(data), 5)
1220 self.assertEqual(data, b"abcde")
1221
1222 def test_write(self):
1223 w = self.MockRawIO()
1224 pair = self.tp(self.MockRawIO(), w)
1225
1226 pair.write(b"abc")
1227 pair.flush()
1228 pair.write(b"def")
1229 pair.flush()
1230 self.assertEqual(w._write_stack, [b"abc", b"def"])
1231
1232 def test_peek(self):
1233 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1234
1235 self.assertTrue(pair.peek(3).startswith(b"abc"))
1236 self.assertEqual(pair.read(3), b"abc")
1237
1238 def test_readable(self):
1239 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1240 self.assertTrue(pair.readable())
1241
1242 def test_writeable(self):
1243 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1244 self.assertTrue(pair.writable())
1245
1246 def test_seekable(self):
1247 # BufferedRWPairs are never seekable, even if their readers and writers
1248 # are.
1249 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1250 self.assertFalse(pair.seekable())
1251
1252 # .flush() is delegated to the underlying writer object and has been
1253 # tested in the test_write method.
1254
1255 def test_close_and_closed(self):
1256 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1257 self.assertFalse(pair.closed)
1258 pair.close()
1259 self.assertTrue(pair.closed)
1260
1261 def test_isatty(self):
1262 class SelectableIsAtty(MockRawIO):
1263 def __init__(self, isatty):
1264 MockRawIO.__init__(self)
1265 self._isatty = isatty
1266
1267 def isatty(self):
1268 return self._isatty
1269
1270 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1271 self.assertFalse(pair.isatty())
1272
1273 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1274 self.assertTrue(pair.isatty())
1275
1276 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1277 self.assertTrue(pair.isatty())
1278
1279 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1280 self.assertTrue(pair.isatty())
1281
1282class CBufferedRWPairTest(BufferedRWPairTest):
1283 tp = io.BufferedRWPair
1284
1285class PyBufferedRWPairTest(BufferedRWPairTest):
1286 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001287
1288
Antoine Pitrou19690592009-06-12 20:14:08 +00001289class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1290 read_mode = "rb+"
1291 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001292
Antoine Pitrou19690592009-06-12 20:14:08 +00001293 def test_constructor(self):
1294 BufferedReaderTest.test_constructor(self)
1295 BufferedWriterTest.test_constructor(self)
1296
1297 def test_read_and_write(self):
1298 raw = self.MockRawIO((b"asdf", b"ghjk"))
1299 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001300
1301 self.assertEqual(b"as", rw.read(2))
1302 rw.write(b"ddd")
1303 rw.write(b"eee")
1304 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001305 self.assertEqual(b"ghjk", rw.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001306 self.assertEquals(b"dddeee", raw._write_stack[0])
1307
Antoine Pitrou19690592009-06-12 20:14:08 +00001308 def test_seek_and_tell(self):
1309 raw = self.BytesIO(b"asdfghjkl")
1310 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001311
1312 self.assertEquals(b"as", rw.read(2))
1313 self.assertEquals(2, rw.tell())
1314 rw.seek(0, 0)
1315 self.assertEquals(b"asdf", rw.read(4))
1316
1317 rw.write(b"asdf")
1318 rw.seek(0, 0)
1319 self.assertEquals(b"asdfasdfl", rw.read())
1320 self.assertEquals(9, rw.tell())
1321 rw.seek(-4, 2)
1322 self.assertEquals(5, rw.tell())
1323 rw.seek(2, 1)
1324 self.assertEquals(7, rw.tell())
1325 self.assertEquals(b"fl", rw.read(11))
1326 self.assertRaises(TypeError, rw.seek, 0.0)
1327
Antoine Pitrou19690592009-06-12 20:14:08 +00001328 def check_flush_and_read(self, read_func):
1329 raw = self.BytesIO(b"abcdefghi")
1330 bufio = self.tp(raw)
1331
1332 self.assertEquals(b"ab", read_func(bufio, 2))
1333 bufio.write(b"12")
1334 self.assertEquals(b"ef", read_func(bufio, 2))
1335 self.assertEquals(6, bufio.tell())
1336 bufio.flush()
1337 self.assertEquals(6, bufio.tell())
1338 self.assertEquals(b"ghi", read_func(bufio))
1339 raw.seek(0, 0)
1340 raw.write(b"XYZ")
1341 # flush() resets the read buffer
1342 bufio.flush()
1343 bufio.seek(0, 0)
1344 self.assertEquals(b"XYZ", read_func(bufio, 3))
1345
1346 def test_flush_and_read(self):
1347 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1348
1349 def test_flush_and_readinto(self):
1350 def _readinto(bufio, n=-1):
1351 b = bytearray(n if n >= 0 else 9999)
1352 n = bufio.readinto(b)
1353 return bytes(b[:n])
1354 self.check_flush_and_read(_readinto)
1355
1356 def test_flush_and_peek(self):
1357 def _peek(bufio, n=-1):
1358 # This relies on the fact that the buffer can contain the whole
1359 # raw stream, otherwise peek() can return less.
1360 b = bufio.peek(n)
1361 if n != -1:
1362 b = b[:n]
1363 bufio.seek(len(b), 1)
1364 return b
1365 self.check_flush_and_read(_peek)
1366
1367 def test_flush_and_write(self):
1368 raw = self.BytesIO(b"abcdefghi")
1369 bufio = self.tp(raw)
1370
1371 bufio.write(b"123")
1372 bufio.flush()
1373 bufio.write(b"45")
1374 bufio.flush()
1375 bufio.seek(0, 0)
1376 self.assertEquals(b"12345fghi", raw.getvalue())
1377 self.assertEquals(b"12345fghi", bufio.read())
1378
1379 def test_threads(self):
1380 BufferedReaderTest.test_threads(self)
1381 BufferedWriterTest.test_threads(self)
1382
1383 def test_writes_and_peek(self):
1384 def _peek(bufio):
1385 bufio.peek(1)
1386 self.check_writes(_peek)
1387 def _peek(bufio):
1388 pos = bufio.tell()
1389 bufio.seek(-1, 1)
1390 bufio.peek(1)
1391 bufio.seek(pos, 0)
1392 self.check_writes(_peek)
1393
1394 def test_writes_and_reads(self):
1395 def _read(bufio):
1396 bufio.seek(-1, 1)
1397 bufio.read(1)
1398 self.check_writes(_read)
1399
1400 def test_writes_and_read1s(self):
1401 def _read1(bufio):
1402 bufio.seek(-1, 1)
1403 bufio.read1(1)
1404 self.check_writes(_read1)
1405
1406 def test_writes_and_readintos(self):
1407 def _read(bufio):
1408 bufio.seek(-1, 1)
1409 bufio.readinto(bytearray(1))
1410 self.check_writes(_read)
1411
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001412 def test_write_after_readahead(self):
1413 # Issue #6629: writing after the buffer was filled by readahead should
1414 # first rewind the raw stream.
1415 for overwrite_size in [1, 5]:
1416 raw = self.BytesIO(b"A" * 10)
1417 bufio = self.tp(raw, 4)
1418 # Trigger readahead
1419 self.assertEqual(bufio.read(1), b"A")
1420 self.assertEqual(bufio.tell(), 1)
1421 # Overwriting should rewind the raw stream if it needs so
1422 bufio.write(b"B" * overwrite_size)
1423 self.assertEqual(bufio.tell(), overwrite_size + 1)
1424 # If the write size was smaller than the buffer size, flush() and
1425 # check that rewind happens.
1426 bufio.flush()
1427 self.assertEqual(bufio.tell(), overwrite_size + 1)
1428 s = raw.getvalue()
1429 self.assertEqual(s,
1430 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1431
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001432 def test_truncate_after_read_or_write(self):
1433 raw = self.BytesIO(b"A" * 10)
1434 bufio = self.tp(raw, 100)
1435 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1436 self.assertEqual(bufio.truncate(), 2)
1437 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1438 self.assertEqual(bufio.truncate(), 4)
1439
Antoine Pitrou19690592009-06-12 20:14:08 +00001440 def test_misbehaved_io(self):
1441 BufferedReaderTest.test_misbehaved_io(self)
1442 BufferedWriterTest.test_misbehaved_io(self)
1443
1444class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1445 tp = io.BufferedRandom
1446
1447 def test_constructor(self):
1448 BufferedRandomTest.test_constructor(self)
1449 # The allocation can succeed on 32-bit builds, e.g. with more
1450 # than 2GB RAM and a 64-bit kernel.
1451 if sys.maxsize > 0x7FFFFFFF:
1452 rawio = self.MockRawIO()
1453 bufio = self.tp(rawio)
1454 self.assertRaises((OverflowError, MemoryError, ValueError),
1455 bufio.__init__, rawio, sys.maxsize)
1456
1457 def test_garbage_collection(self):
1458 CBufferedReaderTest.test_garbage_collection(self)
1459 CBufferedWriterTest.test_garbage_collection(self)
1460
1461class PyBufferedRandomTest(BufferedRandomTest):
1462 tp = pyio.BufferedRandom
1463
1464
Christian Heimes1a6387e2008-03-26 12:49:49 +00001465# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1466# properties:
1467# - A single output character can correspond to many bytes of input.
1468# - The number of input bytes to complete the character can be
1469# undetermined until the last input byte is received.
1470# - The number of input bytes can vary depending on previous input.
1471# - A single input byte can correspond to many characters of output.
1472# - The number of output characters can be undetermined until the
1473# last input byte is received.
1474# - The number of output characters can vary depending on previous input.
1475
1476class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1477 """
1478 For testing seek/tell behavior with a stateful, buffering decoder.
1479
1480 Input is a sequence of words. Words may be fixed-length (length set
1481 by input) or variable-length (period-terminated). In variable-length
1482 mode, extra periods are ignored. Possible words are:
1483 - 'i' followed by a number sets the input length, I (maximum 99).
1484 When I is set to 0, words are space-terminated.
1485 - 'o' followed by a number sets the output length, O (maximum 99).
1486 - Any other word is converted into a word followed by a period on
1487 the output. The output word consists of the input word truncated
1488 or padded out with hyphens to make its length equal to O. If O
1489 is 0, the word is output verbatim without truncating or padding.
1490 I and O are initially set to 1. When I changes, any buffered input is
1491 re-scanned according to the new I. EOF also terminates the last word.
1492 """
1493
1494 def __init__(self, errors='strict'):
1495 codecs.IncrementalDecoder.__init__(self, errors)
1496 self.reset()
1497
1498 def __repr__(self):
1499 return '<SID %x>' % id(self)
1500
1501 def reset(self):
1502 self.i = 1
1503 self.o = 1
1504 self.buffer = bytearray()
1505
1506 def getstate(self):
1507 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1508 return bytes(self.buffer), i*100 + o
1509
1510 def setstate(self, state):
1511 buffer, io = state
1512 self.buffer = bytearray(buffer)
1513 i, o = divmod(io, 100)
1514 self.i, self.o = i ^ 1, o ^ 1
1515
1516 def decode(self, input, final=False):
1517 output = ''
1518 for b in input:
1519 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001520 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001521 if self.buffer:
1522 output += self.process_word()
1523 else:
1524 self.buffer.append(b)
1525 else: # fixed-length, terminate after self.i bytes
1526 self.buffer.append(b)
1527 if len(self.buffer) == self.i:
1528 output += self.process_word()
1529 if final and self.buffer: # EOF terminates the last word
1530 output += self.process_word()
1531 return output
1532
1533 def process_word(self):
1534 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001535 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001536 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001537 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001538 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1539 else:
1540 output = self.buffer.decode('ascii')
1541 if len(output) < self.o:
1542 output += '-'*self.o # pad out with hyphens
1543 if self.o:
1544 output = output[:self.o] # truncate to output length
1545 output += '.'
1546 self.buffer = bytearray()
1547 return output
1548
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001549 codecEnabled = False
1550
1551 @classmethod
1552 def lookupTestDecoder(cls, name):
1553 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001554 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001555 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001556 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001557 incrementalencoder=None,
1558 streamreader=None, streamwriter=None,
1559 incrementaldecoder=cls)
1560
1561# Register the previous decoder for testing.
1562# Disabled by default, tests will enable it.
1563codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1564
1565
Christian Heimes1a6387e2008-03-26 12:49:49 +00001566class StatefulIncrementalDecoderTest(unittest.TestCase):
1567 """
1568 Make sure the StatefulIncrementalDecoder actually works.
1569 """
1570
1571 test_cases = [
1572 # I=1, O=1 (fixed-length input == fixed-length output)
1573 (b'abcd', False, 'a.b.c.d.'),
1574 # I=0, O=0 (variable-length input, variable-length output)
1575 (b'oiabcd', True, 'abcd.'),
1576 # I=0, O=0 (should ignore extra periods)
1577 (b'oi...abcd...', True, 'abcd.'),
1578 # I=0, O=6 (variable-length input, fixed-length output)
1579 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1580 # I=2, O=6 (fixed-length input < fixed-length output)
1581 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1582 # I=6, O=3 (fixed-length input > fixed-length output)
1583 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1584 # I=0, then 3; O=29, then 15 (with longer output)
1585 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1586 'a----------------------------.' +
1587 'b----------------------------.' +
1588 'cde--------------------------.' +
1589 'abcdefghijabcde.' +
1590 'a.b------------.' +
1591 '.c.------------.' +
1592 'd.e------------.' +
1593 'k--------------.' +
1594 'l--------------.' +
1595 'm--------------.')
1596 ]
1597
Antoine Pitrou19690592009-06-12 20:14:08 +00001598 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001599 # Try a few one-shot test cases.
1600 for input, eof, output in self.test_cases:
1601 d = StatefulIncrementalDecoder()
1602 self.assertEquals(d.decode(input, eof), output)
1603
1604 # Also test an unfinished decode, followed by forcing EOF.
1605 d = StatefulIncrementalDecoder()
1606 self.assertEquals(d.decode(b'oiabcd'), '')
1607 self.assertEquals(d.decode(b'', 1), 'abcd.')
1608
1609class TextIOWrapperTest(unittest.TestCase):
1610
1611 def setUp(self):
1612 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1613 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001614 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001615
1616 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001617 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001618
Antoine Pitrou19690592009-06-12 20:14:08 +00001619 def test_constructor(self):
1620 r = self.BytesIO(b"\xc3\xa9\n\n")
1621 b = self.BufferedReader(r, 1000)
1622 t = self.TextIOWrapper(b)
1623 t.__init__(b, encoding="latin1", newline="\r\n")
1624 self.assertEquals(t.encoding, "latin1")
1625 self.assertEquals(t.line_buffering, False)
1626 t.__init__(b, encoding="utf8", line_buffering=True)
1627 self.assertEquals(t.encoding, "utf8")
1628 self.assertEquals(t.line_buffering, True)
1629 self.assertEquals("\xe9\n", t.readline())
1630 self.assertRaises(TypeError, t.__init__, b, newline=42)
1631 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1632
1633 def test_detach(self):
1634 r = self.BytesIO()
1635 b = self.BufferedWriter(r)
1636 t = self.TextIOWrapper(b)
1637 self.assertIs(t.detach(), b)
1638
1639 t = self.TextIOWrapper(b, encoding="ascii")
1640 t.write("howdy")
1641 self.assertFalse(r.getvalue())
1642 t.detach()
1643 self.assertEqual(r.getvalue(), b"howdy")
1644 self.assertRaises(ValueError, t.detach)
1645
1646 def test_repr(self):
1647 raw = self.BytesIO("hello".encode("utf-8"))
1648 b = self.BufferedReader(raw)
1649 t = self.TextIOWrapper(b, encoding="utf-8")
1650 modname = self.TextIOWrapper.__module__
1651 self.assertEqual(repr(t),
1652 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1653 raw.name = "dummy"
1654 self.assertEqual(repr(t),
1655 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1656 raw.name = b"dummy"
1657 self.assertEqual(repr(t),
1658 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1659
1660 def test_line_buffering(self):
1661 r = self.BytesIO()
1662 b = self.BufferedWriter(r, 1000)
1663 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1664 t.write("X")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001665 self.assertEquals(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001666 t.write("Y\nZ")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001667 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001668 t.write("A\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001669 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1670
Antoine Pitrou19690592009-06-12 20:14:08 +00001671 def test_encoding(self):
1672 # Check the encoding attribute is always set, and valid
1673 b = self.BytesIO()
1674 t = self.TextIOWrapper(b, encoding="utf8")
1675 self.assertEqual(t.encoding, "utf8")
1676 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001677 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001678 codecs.lookup(t.encoding)
1679
1680 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001681 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001682 b = self.BytesIO(b"abc\n\xff\n")
1683 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001684 self.assertRaises(UnicodeError, t.read)
1685 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001686 b = self.BytesIO(b"abc\n\xff\n")
1687 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001688 self.assertRaises(UnicodeError, t.read)
1689 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001690 b = self.BytesIO(b"abc\n\xff\n")
1691 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001692 self.assertEquals(t.read(), "abc\n\n")
1693 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001694 b = self.BytesIO(b"abc\n\xff\n")
1695 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
1696 self.assertEquals(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001697
Antoine Pitrou19690592009-06-12 20:14:08 +00001698 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001699 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001700 b = self.BytesIO()
1701 t = self.TextIOWrapper(b, encoding="ascii")
1702 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001703 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001704 b = self.BytesIO()
1705 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1706 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001707 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001708 b = self.BytesIO()
1709 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001710 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001711 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001712 t.flush()
1713 self.assertEquals(b.getvalue(), b"abcdef\n")
1714 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001715 b = self.BytesIO()
1716 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001717 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001718 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001719 t.flush()
1720 self.assertEquals(b.getvalue(), b"abc?def\n")
1721
Antoine Pitrou19690592009-06-12 20:14:08 +00001722 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001723 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1724
1725 tests = [
1726 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1727 [ '', input_lines ],
1728 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1729 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1730 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1731 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001732 encodings = (
1733 'utf-8', 'latin-1',
1734 'utf-16', 'utf-16-le', 'utf-16-be',
1735 'utf-32', 'utf-32-le', 'utf-32-be',
1736 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001737
1738 # Try a range of buffer sizes to test the case where \r is the last
1739 # character in TextIOWrapper._pending_line.
1740 for encoding in encodings:
1741 # XXX: str.encode() should return bytes
1742 data = bytes(''.join(input_lines).encode(encoding))
1743 for do_reads in (False, True):
1744 for bufsize in range(1, 10):
1745 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001746 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1747 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001748 encoding=encoding)
1749 if do_reads:
1750 got_lines = []
1751 while True:
1752 c2 = textio.read(2)
1753 if c2 == '':
1754 break
1755 self.assertEquals(len(c2), 2)
1756 got_lines.append(c2 + textio.readline())
1757 else:
1758 got_lines = list(textio)
1759
1760 for got_line, exp_line in zip(got_lines, exp_lines):
1761 self.assertEquals(got_line, exp_line)
1762 self.assertEquals(len(got_lines), len(exp_lines))
1763
Antoine Pitrou19690592009-06-12 20:14:08 +00001764 def test_newlines_input(self):
1765 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001766 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1767 for newline, expected in [
1768 (None, normalized.decode("ascii").splitlines(True)),
1769 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001770 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1771 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1772 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001773 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001774 buf = self.BytesIO(testdata)
1775 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001776 self.assertEquals(txt.readlines(), expected)
1777 txt.seek(0)
1778 self.assertEquals(txt.read(), "".join(expected))
1779
Antoine Pitrou19690592009-06-12 20:14:08 +00001780 def test_newlines_output(self):
1781 testdict = {
1782 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1783 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1784 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1785 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1786 }
1787 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1788 for newline, expected in tests:
1789 buf = self.BytesIO()
1790 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1791 txt.write("AAA\nB")
1792 txt.write("BB\nCCC\n")
1793 txt.write("X\rY\r\nZ")
1794 txt.flush()
1795 self.assertEquals(buf.closed, False)
1796 self.assertEquals(buf.getvalue(), expected)
1797
1798 def test_destructor(self):
1799 l = []
1800 base = self.BytesIO
1801 class MyBytesIO(base):
1802 def close(self):
1803 l.append(self.getvalue())
1804 base.close(self)
1805 b = MyBytesIO()
1806 t = self.TextIOWrapper(b, encoding="ascii")
1807 t.write("abc")
1808 del t
1809 support.gc_collect()
1810 self.assertEquals([b"abc"], l)
1811
1812 def test_override_destructor(self):
1813 record = []
1814 class MyTextIO(self.TextIOWrapper):
1815 def __del__(self):
1816 record.append(1)
1817 try:
1818 f = super(MyTextIO, self).__del__
1819 except AttributeError:
1820 pass
1821 else:
1822 f()
1823 def close(self):
1824 record.append(2)
1825 super(MyTextIO, self).close()
1826 def flush(self):
1827 record.append(3)
1828 super(MyTextIO, self).flush()
1829 b = self.BytesIO()
1830 t = MyTextIO(b, encoding="ascii")
1831 del t
1832 support.gc_collect()
1833 self.assertEqual(record, [1, 2, 3])
1834
1835 def test_error_through_destructor(self):
1836 # Test that the exception state is not modified by a destructor,
1837 # even if close() fails.
1838 rawio = self.CloseFailureIO()
1839 def f():
1840 self.TextIOWrapper(rawio).xyzzy
1841 with support.captured_output("stderr") as s:
1842 self.assertRaises(AttributeError, f)
1843 s = s.getvalue().strip()
1844 if s:
1845 # The destructor *may* have printed an unraisable error, check it
1846 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001847 self.assertTrue(s.startswith("Exception IOError: "), s)
1848 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001849
1850 # Systematic tests of the text I/O API
1851
Antoine Pitrou19690592009-06-12 20:14:08 +00001852 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001853 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1854 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00001855 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001856 f._CHUNK_SIZE = chunksize
Antoine Pitrou19690592009-06-12 20:14:08 +00001857 self.assertEquals(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001858 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001859 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001860 f._CHUNK_SIZE = chunksize
1861 self.assertEquals(f.tell(), 0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001862 self.assertEquals(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001863 cookie = f.tell()
1864 self.assertEquals(f.seek(0), 0)
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001865 self.assertEquals(f.read(None), "abc")
1866 f.seek(0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001867 self.assertEquals(f.read(2), "ab")
1868 self.assertEquals(f.read(1), "c")
1869 self.assertEquals(f.read(1), "")
1870 self.assertEquals(f.read(), "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001871 self.assertEquals(f.tell(), cookie)
1872 self.assertEquals(f.seek(0), 0)
1873 self.assertEquals(f.seek(0, 2), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001874 self.assertEquals(f.write("def"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001875 self.assertEquals(f.seek(cookie), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001876 self.assertEquals(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001877 if enc.startswith("utf"):
1878 self.multi_line_test(f, enc)
1879 f.close()
1880
1881 def multi_line_test(self, f, enc):
1882 f.seek(0)
1883 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00001884 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001885 wlines = []
1886 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1887 chars = []
1888 for i in range(size):
1889 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00001890 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001891 wlines.append((f.tell(), line))
1892 f.write(line)
1893 f.seek(0)
1894 rlines = []
1895 while True:
1896 pos = f.tell()
1897 line = f.readline()
1898 if not line:
1899 break
1900 rlines.append((pos, line))
1901 self.assertEquals(rlines, wlines)
1902
Antoine Pitrou19690592009-06-12 20:14:08 +00001903 def test_telling(self):
1904 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001905 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001906 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001907 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001908 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001909 p2 = f.tell()
1910 f.seek(0)
1911 self.assertEquals(f.tell(), p0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001912 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001913 self.assertEquals(f.tell(), p1)
Antoine Pitrou19690592009-06-12 20:14:08 +00001914 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001915 self.assertEquals(f.tell(), p2)
1916 f.seek(0)
1917 for line in f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001918 self.assertEquals(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001919 self.assertRaises(IOError, f.tell)
1920 self.assertEquals(f.tell(), p2)
1921 f.close()
1922
Antoine Pitrou19690592009-06-12 20:14:08 +00001923 def test_seeking(self):
1924 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00001925 prefix_size = chunk_size - 2
1926 u_prefix = "a" * prefix_size
1927 prefix = bytes(u_prefix.encode("utf-8"))
1928 self.assertEquals(len(u_prefix), len(prefix))
1929 u_suffix = "\u8888\n"
1930 suffix = bytes(u_suffix.encode("utf-8"))
1931 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00001932 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001933 f.write(line*2)
1934 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001935 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001936 s = f.read(prefix_size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001937 self.assertEquals(s, prefix.decode("ascii"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001938 self.assertEquals(f.tell(), prefix_size)
1939 self.assertEquals(f.readline(), u_suffix)
1940
Antoine Pitrou19690592009-06-12 20:14:08 +00001941 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001942 # Regression test for a specific bug
1943 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00001944 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001945 f.write(data)
1946 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001947 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001948 f._CHUNK_SIZE # Just test that it exists
1949 f._CHUNK_SIZE = 2
1950 f.readline()
1951 f.tell()
1952
Antoine Pitrou19690592009-06-12 20:14:08 +00001953 def test_seek_and_tell(self):
1954 #Test seek/tell using the StatefulIncrementalDecoder.
1955 # Make test faster by doing smaller seeks
1956 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00001957
Antoine Pitrou19690592009-06-12 20:14:08 +00001958 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001959 """Tell/seek to various points within a data stream and ensure
1960 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00001961 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001962 f.write(data)
1963 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001964 f = self.open(support.TESTFN, encoding='test_decoder')
1965 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00001966 decoded = f.read()
1967 f.close()
1968
1969 for i in range(min_pos, len(decoded) + 1): # seek positions
1970 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00001971 f = self.open(support.TESTFN, encoding='test_decoder')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001972 self.assertEquals(f.read(i), decoded[:i])
1973 cookie = f.tell()
1974 self.assertEquals(f.read(j), decoded[i:i + j])
1975 f.seek(cookie)
1976 self.assertEquals(f.read(), decoded[i:])
1977 f.close()
1978
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001979 # Enable the test decoder.
1980 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00001981
1982 # Run the tests.
1983 try:
1984 # Try each test case.
1985 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00001986 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001987
1988 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00001989 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1990 offset = CHUNK_SIZE - len(input)//2
1991 prefix = b'.'*offset
1992 # Don't bother seeking into the prefix (takes too long).
1993 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00001994 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001995
1996 # Ensure our test decoder won't interfere with subsequent tests.
1997 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001998 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00001999
Antoine Pitrou19690592009-06-12 20:14:08 +00002000 def test_encoded_writes(self):
2001 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002002 tests = ("utf-16",
2003 "utf-16-le",
2004 "utf-16-be",
2005 "utf-32",
2006 "utf-32-le",
2007 "utf-32-be")
2008 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002009 buf = self.BytesIO()
2010 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002011 # Check if the BOM is written only once (see issue1753).
2012 f.write(data)
2013 f.write(data)
2014 f.seek(0)
2015 self.assertEquals(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002016 f.seek(0)
2017 self.assertEquals(f.read(), data * 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002018 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
2019
Antoine Pitrou19690592009-06-12 20:14:08 +00002020 def test_unreadable(self):
2021 class UnReadable(self.BytesIO):
2022 def readable(self):
2023 return False
2024 txt = self.TextIOWrapper(UnReadable())
2025 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002026
Antoine Pitrou19690592009-06-12 20:14:08 +00002027 def test_read_one_by_one(self):
2028 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002029 reads = ""
2030 while True:
2031 c = txt.read(1)
2032 if not c:
2033 break
2034 reads += c
2035 self.assertEquals(reads, "AA\nBB")
2036
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002037 def test_readlines(self):
2038 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2039 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2040 txt.seek(0)
2041 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2042 txt.seek(0)
2043 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2044
Christian Heimes1a6387e2008-03-26 12:49:49 +00002045 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002046 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002047 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002048 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002049 reads = ""
2050 while True:
2051 c = txt.read(128)
2052 if not c:
2053 break
2054 reads += c
2055 self.assertEquals(reads, "A"*127+"\nB")
2056
2057 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002058 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002059
2060 # read one char at a time
2061 reads = ""
2062 while True:
2063 c = txt.read(1)
2064 if not c:
2065 break
2066 reads += c
2067 self.assertEquals(reads, self.normalized)
2068
2069 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002070 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002071 txt._CHUNK_SIZE = 4
2072
2073 reads = ""
2074 while True:
2075 c = txt.read(4)
2076 if not c:
2077 break
2078 reads += c
2079 self.assertEquals(reads, self.normalized)
2080
2081 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002082 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002083 txt._CHUNK_SIZE = 4
2084
2085 reads = txt.read(4)
2086 reads += txt.read(4)
2087 reads += txt.readline()
2088 reads += txt.readline()
2089 reads += txt.readline()
2090 self.assertEquals(reads, self.normalized)
2091
2092 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002093 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002094 txt._CHUNK_SIZE = 4
2095
2096 reads = txt.read(4)
2097 reads += txt.read()
2098 self.assertEquals(reads, self.normalized)
2099
2100 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002101 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002102 txt._CHUNK_SIZE = 4
2103
2104 reads = txt.read(4)
2105 pos = txt.tell()
2106 txt.seek(0)
2107 txt.seek(pos)
2108 self.assertEquals(txt.read(4), "BBB\n")
2109
2110 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002111 buffer = self.BytesIO(self.testdata)
2112 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002113
2114 self.assertEqual(buffer.seekable(), txt.seekable())
2115
Antoine Pitrou19690592009-06-12 20:14:08 +00002116 def test_append_bom(self):
2117 # The BOM is not written again when appending to a non-empty file
2118 filename = support.TESTFN
2119 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2120 with self.open(filename, 'w', encoding=charset) as f:
2121 f.write('aaa')
2122 pos = f.tell()
2123 with self.open(filename, 'rb') as f:
2124 self.assertEquals(f.read(), 'aaa'.encode(charset))
2125
2126 with self.open(filename, 'a', encoding=charset) as f:
2127 f.write('xxx')
2128 with self.open(filename, 'rb') as f:
2129 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2130
Antoine Pitrou19690592009-06-12 20:14:08 +00002131 def test_seek_bom(self):
2132 # Same test, but when seeking manually
2133 filename = support.TESTFN
2134 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2135 with self.open(filename, 'w', encoding=charset) as f:
2136 f.write('aaa')
2137 pos = f.tell()
2138 with self.open(filename, 'r+', encoding=charset) as f:
2139 f.seek(pos)
2140 f.write('zzz')
2141 f.seek(0)
2142 f.write('bbb')
2143 with self.open(filename, 'rb') as f:
2144 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2145
2146 def test_errors_property(self):
2147 with self.open(support.TESTFN, "w") as f:
2148 self.assertEqual(f.errors, "strict")
2149 with self.open(support.TESTFN, "w", errors="replace") as f:
2150 self.assertEqual(f.errors, "replace")
2151
Victor Stinner6a102812010-04-27 23:55:59 +00002152 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002153 def test_threads_write(self):
2154 # Issue6750: concurrent writes could duplicate data
2155 event = threading.Event()
2156 with self.open(support.TESTFN, "w", buffering=1) as f:
2157 def run(n):
2158 text = "Thread%03d\n" % n
2159 event.wait()
2160 f.write(text)
2161 threads = [threading.Thread(target=lambda n=x: run(n))
2162 for x in range(20)]
2163 for t in threads:
2164 t.start()
2165 time.sleep(0.02)
2166 event.set()
2167 for t in threads:
2168 t.join()
2169 with self.open(support.TESTFN) as f:
2170 content = f.read()
2171 for n in range(20):
2172 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2173
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002174 def test_flush_error_on_close(self):
2175 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2176 def bad_flush():
2177 raise IOError()
2178 txt.flush = bad_flush
2179 self.assertRaises(IOError, txt.close) # exception not swallowed
2180
2181 def test_multi_close(self):
2182 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2183 txt.close()
2184 txt.close()
2185 txt.close()
2186 self.assertRaises(ValueError, txt.flush)
2187
Antoine Pitrou19690592009-06-12 20:14:08 +00002188class CTextIOWrapperTest(TextIOWrapperTest):
2189
2190 def test_initialization(self):
2191 r = self.BytesIO(b"\xc3\xa9\n\n")
2192 b = self.BufferedReader(r, 1000)
2193 t = self.TextIOWrapper(b)
2194 self.assertRaises(TypeError, t.__init__, b, newline=42)
2195 self.assertRaises(ValueError, t.read)
2196 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2197 self.assertRaises(ValueError, t.read)
2198
2199 def test_garbage_collection(self):
2200 # C TextIOWrapper objects are collected, and collecting them flushes
2201 # all data to disk.
2202 # The Python version has __del__, so it ends in gc.garbage instead.
2203 rawio = io.FileIO(support.TESTFN, "wb")
2204 b = self.BufferedWriter(rawio)
2205 t = self.TextIOWrapper(b, encoding="ascii")
2206 t.write("456def")
2207 t.x = t
2208 wr = weakref.ref(t)
2209 del t
2210 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002211 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002212 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002213 self.assertEqual(f.read(), b"456def")
2214
2215class PyTextIOWrapperTest(TextIOWrapperTest):
2216 pass
2217
2218
2219class IncrementalNewlineDecoderTest(unittest.TestCase):
2220
2221 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002222 # UTF-8 specific tests for a newline decoder
2223 def _check_decode(b, s, **kwargs):
2224 # We exercise getstate() / setstate() as well as decode()
2225 state = decoder.getstate()
2226 self.assertEquals(decoder.decode(b, **kwargs), s)
2227 decoder.setstate(state)
2228 self.assertEquals(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002229
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002230 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002231
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002232 _check_decode(b'\xe8', "")
2233 _check_decode(b'\xa2', "")
2234 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002235
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002236 _check_decode(b'\xe8', "")
2237 _check_decode(b'\xa2', "")
2238 _check_decode(b'\x88', "\u8888")
2239
2240 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002241 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2242
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002243 decoder.reset()
2244 _check_decode(b'\n', "\n")
2245 _check_decode(b'\r', "")
2246 _check_decode(b'', "\n", final=True)
2247 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002248
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002249 _check_decode(b'\r', "")
2250 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002251
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002252 _check_decode(b'\r\r\n', "\n\n")
2253 _check_decode(b'\r', "")
2254 _check_decode(b'\r', "\n")
2255 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002256
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002257 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2258 _check_decode(b'\xe8\xa2\x88', "\u8888")
2259 _check_decode(b'\n', "\n")
2260 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2261 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002262
Antoine Pitrou19690592009-06-12 20:14:08 +00002263 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002264 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002265 if encoding is not None:
2266 encoder = codecs.getincrementalencoder(encoding)()
2267 def _decode_bytewise(s):
2268 # Decode one byte at a time
2269 for b in encoder.encode(s):
2270 result.append(decoder.decode(b))
2271 else:
2272 encoder = None
2273 def _decode_bytewise(s):
2274 # Decode one char at a time
2275 for c in s:
2276 result.append(decoder.decode(c))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002277 self.assertEquals(decoder.newlines, None)
2278 _decode_bytewise("abc\n\r")
2279 self.assertEquals(decoder.newlines, '\n')
2280 _decode_bytewise("\nabc")
2281 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2282 _decode_bytewise("abc\r")
2283 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2284 _decode_bytewise("abc")
2285 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2286 _decode_bytewise("abc\r")
2287 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2288 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002289 input = "abc"
2290 if encoder is not None:
2291 encoder.reset()
2292 input = encoder.encode(input)
2293 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002294 self.assertEquals(decoder.newlines, None)
2295
2296 def test_newline_decoder(self):
2297 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002298 # None meaning the IncrementalNewlineDecoder takes unicode input
2299 # rather than bytes input
2300 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002301 'utf-16', 'utf-16-le', 'utf-16-be',
2302 'utf-32', 'utf-32-le', 'utf-32-be',
2303 )
2304 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002305 decoder = enc and codecs.getincrementaldecoder(enc)()
2306 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2307 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002308 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002309 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2310 self.check_newline_decoding_utf8(decoder)
2311
2312 def test_newline_bytes(self):
2313 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2314 def _check(dec):
2315 self.assertEquals(dec.newlines, None)
2316 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2317 self.assertEquals(dec.newlines, None)
2318 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2319 self.assertEquals(dec.newlines, None)
2320 dec = self.IncrementalNewlineDecoder(None, translate=False)
2321 _check(dec)
2322 dec = self.IncrementalNewlineDecoder(None, translate=True)
2323 _check(dec)
2324
2325class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2326 pass
2327
2328class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2329 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002330
Christian Heimes1a6387e2008-03-26 12:49:49 +00002331
2332# XXX Tests for open()
2333
2334class MiscIOTest(unittest.TestCase):
2335
Benjamin Petersonad100c32008-11-20 22:06:22 +00002336 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002337 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002338
Antoine Pitrou19690592009-06-12 20:14:08 +00002339 def test___all__(self):
2340 for name in self.io.__all__:
2341 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002342 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002343 if name == "open":
2344 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002345 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002346 self.assertTrue(issubclass(obj, Exception), name)
2347 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002348 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002349
Benjamin Petersonad100c32008-11-20 22:06:22 +00002350 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002351 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002352 self.assertEquals(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002353 f.close()
2354
Antoine Pitrou19690592009-06-12 20:14:08 +00002355 f = self.open(support.TESTFN, "U")
2356 self.assertEquals(f.name, support.TESTFN)
2357 self.assertEquals(f.buffer.name, support.TESTFN)
2358 self.assertEquals(f.buffer.raw.name, support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002359 self.assertEquals(f.mode, "U")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002360 self.assertEquals(f.buffer.mode, "rb")
2361 self.assertEquals(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002362 f.close()
2363
Antoine Pitrou19690592009-06-12 20:14:08 +00002364 f = self.open(support.TESTFN, "w+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002365 self.assertEquals(f.mode, "w+")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002366 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2367 self.assertEquals(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002368
Antoine Pitrou19690592009-06-12 20:14:08 +00002369 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002370 self.assertEquals(g.mode, "wb")
2371 self.assertEquals(g.raw.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002372 self.assertEquals(g.name, f.fileno())
2373 self.assertEquals(g.raw.name, f.fileno())
2374 f.close()
2375 g.close()
2376
Antoine Pitrou19690592009-06-12 20:14:08 +00002377 def test_io_after_close(self):
2378 for kwargs in [
2379 {"mode": "w"},
2380 {"mode": "wb"},
2381 {"mode": "w", "buffering": 1},
2382 {"mode": "w", "buffering": 2},
2383 {"mode": "wb", "buffering": 0},
2384 {"mode": "r"},
2385 {"mode": "rb"},
2386 {"mode": "r", "buffering": 1},
2387 {"mode": "r", "buffering": 2},
2388 {"mode": "rb", "buffering": 0},
2389 {"mode": "w+"},
2390 {"mode": "w+b"},
2391 {"mode": "w+", "buffering": 1},
2392 {"mode": "w+", "buffering": 2},
2393 {"mode": "w+b", "buffering": 0},
2394 ]:
2395 f = self.open(support.TESTFN, **kwargs)
2396 f.close()
2397 self.assertRaises(ValueError, f.flush)
2398 self.assertRaises(ValueError, f.fileno)
2399 self.assertRaises(ValueError, f.isatty)
2400 self.assertRaises(ValueError, f.__iter__)
2401 if hasattr(f, "peek"):
2402 self.assertRaises(ValueError, f.peek, 1)
2403 self.assertRaises(ValueError, f.read)
2404 if hasattr(f, "read1"):
2405 self.assertRaises(ValueError, f.read1, 1024)
2406 if hasattr(f, "readinto"):
2407 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2408 self.assertRaises(ValueError, f.readline)
2409 self.assertRaises(ValueError, f.readlines)
2410 self.assertRaises(ValueError, f.seek, 0)
2411 self.assertRaises(ValueError, f.tell)
2412 self.assertRaises(ValueError, f.truncate)
2413 self.assertRaises(ValueError, f.write,
2414 b"" if "b" in kwargs['mode'] else "")
2415 self.assertRaises(ValueError, f.writelines, [])
2416 self.assertRaises(ValueError, next, f)
2417
2418 def test_blockingioerror(self):
2419 # Various BlockingIOError issues
2420 self.assertRaises(TypeError, self.BlockingIOError)
2421 self.assertRaises(TypeError, self.BlockingIOError, 1)
2422 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2423 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2424 b = self.BlockingIOError(1, "")
2425 self.assertEqual(b.characters_written, 0)
2426 class C(unicode):
2427 pass
2428 c = C("")
2429 b = self.BlockingIOError(1, c)
2430 c.b = b
2431 b.c = c
2432 wr = weakref.ref(c)
2433 del c, b
2434 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002435 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002436
2437 def test_abcs(self):
2438 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002439 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2440 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2441 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2442 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002443
2444 def _check_abc_inheritance(self, abcmodule):
2445 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002446 self.assertIsInstance(f, abcmodule.IOBase)
2447 self.assertIsInstance(f, abcmodule.RawIOBase)
2448 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2449 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002450 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002451 self.assertIsInstance(f, abcmodule.IOBase)
2452 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2453 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2454 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002455 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002456 self.assertIsInstance(f, abcmodule.IOBase)
2457 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2458 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2459 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002460
2461 def test_abc_inheritance(self):
2462 # Test implementations inherit from their respective ABCs
2463 self._check_abc_inheritance(self)
2464
2465 def test_abc_inheritance_official(self):
2466 # Test implementations inherit from the official ABCs of the
2467 # baseline "io" module.
2468 self._check_abc_inheritance(io)
2469
2470class CMiscIOTest(MiscIOTest):
2471 io = io
2472
2473class PyMiscIOTest(MiscIOTest):
2474 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002475
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002476
2477@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2478class SignalsTest(unittest.TestCase):
2479
2480 def setUp(self):
2481 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2482
2483 def tearDown(self):
2484 signal.signal(signal.SIGALRM, self.oldalrm)
2485
2486 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00002487 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002488
2489 @unittest.skipUnless(threading, 'Threading required for this test.')
2490 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2491 """Check that a partial write, when it gets interrupted, properly
2492 invokes the signal handler."""
2493 read_results = []
2494 def _read():
2495 s = os.read(r, 1)
2496 read_results.append(s)
2497 t = threading.Thread(target=_read)
2498 t.daemon = True
2499 r, w = os.pipe()
2500 try:
2501 wio = self.io.open(w, **fdopen_kwargs)
2502 t.start()
2503 signal.alarm(1)
2504 # Fill the pipe enough that the write will be blocking.
2505 # It will be interrupted by the timer armed above. Since the
2506 # other thread has read one byte, the low-level write will
2507 # return with a successful (partial) result rather than an EINTR.
2508 # The buffered IO layer must check for pending signal
2509 # handlers, which in this case will invoke alarm_interrupt().
2510 self.assertRaises(ZeroDivisionError,
2511 wio.write, item * (1024 * 1024))
2512 t.join()
2513 # We got one byte, get another one and check that it isn't a
2514 # repeat of the first one.
2515 read_results.append(os.read(r, 1))
2516 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2517 finally:
2518 os.close(w)
2519 os.close(r)
2520 # This is deliberate. If we didn't close the file descriptor
2521 # before closing wio, wio would try to flush its internal
2522 # buffer, and block again.
2523 try:
2524 wio.close()
2525 except IOError as e:
2526 if e.errno != errno.EBADF:
2527 raise
2528
2529 def test_interrupted_write_unbuffered(self):
2530 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2531
2532 def test_interrupted_write_buffered(self):
2533 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2534
2535 def test_interrupted_write_text(self):
2536 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2537
2538class CSignalsTest(SignalsTest):
2539 io = io
2540
2541class PySignalsTest(SignalsTest):
2542 io = pyio
2543
2544
Christian Heimes1a6387e2008-03-26 12:49:49 +00002545def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00002546 tests = (CIOTest, PyIOTest,
2547 CBufferedReaderTest, PyBufferedReaderTest,
2548 CBufferedWriterTest, PyBufferedWriterTest,
2549 CBufferedRWPairTest, PyBufferedRWPairTest,
2550 CBufferedRandomTest, PyBufferedRandomTest,
2551 StatefulIncrementalDecoderTest,
2552 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2553 CTextIOWrapperTest, PyTextIOWrapperTest,
2554 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002555 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00002556 )
2557
2558 # Put the namespaces of the IO module we are testing and some useful mock
2559 # classes in the __dict__ of each test.
2560 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2561 MockNonBlockWriterIO)
2562 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2563 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2564 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2565 globs = globals()
2566 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2567 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2568 # Avoid turning open into a bound method.
2569 py_io_ns["open"] = pyio.OpenWrapper
2570 for test in tests:
2571 if test.__name__.startswith("C"):
2572 for name, obj in c_io_ns.items():
2573 setattr(test, name, obj)
2574 elif test.__name__.startswith("Py"):
2575 for name, obj in py_io_ns.items():
2576 setattr(test, name, obj)
2577
2578 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002579
2580if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00002581 test_main()