blob: 6132bae82f3a27a671a515da3b8ad093d08d23d0 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Antoine Pitrou19690592009-06-12 20:14:08 +000032import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000033import signal
34import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000035from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000036from collections import deque
Antoine Pitrou78e761e2012-10-16 22:57:11 +020037from UserList import UserList
Antoine Pitrou19690592009-06-12 20:14:08 +000038from test import test_support as support
Christian Heimes1a6387e2008-03-26 12:49:49 +000039
40import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000043try:
44 import threading
45except ImportError:
46 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010047try:
48 import fcntl
49except ImportError:
50 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000051
52__metaclass__ = type
53bytes = support.py3k_bytes
54
55def _default_chunk_size():
56 """Get the default TextIOWrapper chunk size"""
57 with io.open(__file__, "r", encoding="latin1") as f:
58 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000059
60
Antoine Pitrou6391b342010-09-14 18:48:19 +000061class MockRawIOWithoutRead:
62 """A RawIO implementation without read(), so as to exercise the default
63 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000064
65 def __init__(self, read_stack=()):
66 self._read_stack = list(read_stack)
67 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000068 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000069 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000070
Christian Heimes1a6387e2008-03-26 12:49:49 +000071 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000072 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000073 return len(b)
74
75 def writable(self):
76 return True
77
78 def fileno(self):
79 return 42
80
81 def readable(self):
82 return True
83
84 def seekable(self):
85 return True
86
87 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000088 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000089
90 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000091 return 0 # same comment as above
92
93 def readinto(self, buf):
94 self._reads += 1
95 max_len = len(buf)
96 try:
97 data = self._read_stack[0]
98 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000099 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +0000100 return 0
101 if data is None:
102 del self._read_stack[0]
103 return None
104 n = len(data)
105 if len(data) <= max_len:
106 del self._read_stack[0]
107 buf[:n] = data
108 return n
109 else:
110 buf[:] = data[:max_len]
111 self._read_stack[0] = data[max_len:]
112 return max_len
113
114 def truncate(self, pos=None):
115 return pos
116
Antoine Pitrou6391b342010-09-14 18:48:19 +0000117class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
118 pass
119
120class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
121 pass
122
123
124class MockRawIO(MockRawIOWithoutRead):
125
126 def read(self, n=None):
127 self._reads += 1
128 try:
129 return self._read_stack.pop(0)
130 except:
131 self._extraneous_reads += 1
132 return b""
133
Antoine Pitrou19690592009-06-12 20:14:08 +0000134class CMockRawIO(MockRawIO, io.RawIOBase):
135 pass
136
137class PyMockRawIO(MockRawIO, pyio.RawIOBase):
138 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000139
140
Antoine Pitrou19690592009-06-12 20:14:08 +0000141class MisbehavedRawIO(MockRawIO):
142 def write(self, b):
143 return MockRawIO.write(self, b) * 2
144
145 def read(self, n=None):
146 return MockRawIO.read(self, n) * 2
147
148 def seek(self, pos, whence):
149 return -123
150
151 def tell(self):
152 return -456
153
154 def readinto(self, buf):
155 MockRawIO.readinto(self, buf)
156 return len(buf) * 5
157
158class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
159 pass
160
161class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
162 pass
163
164
165class CloseFailureIO(MockRawIO):
166 closed = 0
167
168 def close(self):
169 if not self.closed:
170 self.closed = 1
171 raise IOError
172
173class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
174 pass
175
176class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
177 pass
178
179
180class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000181
182 def __init__(self, data):
183 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000184 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000185
186 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000187 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000188 self.read_history.append(None if res is None else len(res))
189 return res
190
Antoine Pitrou19690592009-06-12 20:14:08 +0000191 def readinto(self, b):
192 res = super(MockFileIO, self).readinto(b)
193 self.read_history.append(res)
194 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000195
Antoine Pitrou19690592009-06-12 20:14:08 +0000196class CMockFileIO(MockFileIO, io.BytesIO):
197 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000198
Antoine Pitrou19690592009-06-12 20:14:08 +0000199class PyMockFileIO(MockFileIO, pyio.BytesIO):
200 pass
201
202
203class MockNonBlockWriterIO:
204
205 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000206 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000207 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000208
Antoine Pitrou19690592009-06-12 20:14:08 +0000209 def pop_written(self):
210 s = b"".join(self._write_stack)
211 self._write_stack[:] = []
212 return s
213
214 def block_on(self, char):
215 """Block when a given char is encountered."""
216 self._blocker_char = char
217
218 def readable(self):
219 return True
220
221 def seekable(self):
222 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000223
224 def writable(self):
225 return True
226
Antoine Pitrou19690592009-06-12 20:14:08 +0000227 def write(self, b):
228 b = bytes(b)
229 n = -1
230 if self._blocker_char:
231 try:
232 n = b.index(self._blocker_char)
233 except ValueError:
234 pass
235 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100236 if n > 0:
237 # write data up to the first blocker
238 self._write_stack.append(b[:n])
239 return n
240 else:
241 # cancel blocker and indicate would block
242 self._blocker_char = None
243 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000244 self._write_stack.append(b)
245 return len(b)
246
247class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
248 BlockingIOError = io.BlockingIOError
249
250class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
251 BlockingIOError = pyio.BlockingIOError
252
Christian Heimes1a6387e2008-03-26 12:49:49 +0000253
254class IOTest(unittest.TestCase):
255
Antoine Pitrou19690592009-06-12 20:14:08 +0000256 def setUp(self):
257 support.unlink(support.TESTFN)
258
Christian Heimes1a6387e2008-03-26 12:49:49 +0000259 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000260 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000261
262 def write_ops(self, f):
263 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000264 f.truncate(0)
265 self.assertEqual(f.tell(), 5)
266 f.seek(0)
267
268 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000269 self.assertEqual(f.seek(0), 0)
270 self.assertEqual(f.write(b"Hello."), 6)
271 self.assertEqual(f.tell(), 6)
272 self.assertEqual(f.seek(-1, 1), 5)
273 self.assertEqual(f.tell(), 5)
274 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
275 self.assertEqual(f.seek(0), 0)
276 self.assertEqual(f.write(b"h"), 1)
277 self.assertEqual(f.seek(-1, 2), 13)
278 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000279
Christian Heimes1a6387e2008-03-26 12:49:49 +0000280 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000281 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000282 self.assertRaises(TypeError, f.seek, 0.0)
283
284 def read_ops(self, f, buffered=False):
285 data = f.read(5)
286 self.assertEqual(data, b"hello")
287 data = bytearray(data)
288 self.assertEqual(f.readinto(data), 5)
289 self.assertEqual(data, b" worl")
290 self.assertEqual(f.readinto(data), 2)
291 self.assertEqual(len(data), 5)
292 self.assertEqual(data[:2], b"d\n")
293 self.assertEqual(f.seek(0), 0)
294 self.assertEqual(f.read(20), b"hello world\n")
295 self.assertEqual(f.read(1), b"")
296 self.assertEqual(f.readinto(bytearray(b"x")), 0)
297 self.assertEqual(f.seek(-6, 2), 6)
298 self.assertEqual(f.read(5), b"world")
299 self.assertEqual(f.read(0), b"")
300 self.assertEqual(f.readinto(bytearray()), 0)
301 self.assertEqual(f.seek(-6, 1), 5)
302 self.assertEqual(f.read(5), b" worl")
303 self.assertEqual(f.tell(), 10)
304 self.assertRaises(TypeError, f.seek, 0.0)
305 if buffered:
306 f.seek(0)
307 self.assertEqual(f.read(), b"hello world\n")
308 f.seek(6)
309 self.assertEqual(f.read(), b"world\n")
310 self.assertEqual(f.read(), b"")
311
312 LARGE = 2**31
313
314 def large_file_ops(self, f):
315 assert f.readable()
316 assert f.writable()
317 self.assertEqual(f.seek(self.LARGE), self.LARGE)
318 self.assertEqual(f.tell(), self.LARGE)
319 self.assertEqual(f.write(b"xxx"), 3)
320 self.assertEqual(f.tell(), self.LARGE + 3)
321 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
322 self.assertEqual(f.truncate(), self.LARGE + 2)
323 self.assertEqual(f.tell(), self.LARGE + 2)
324 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
325 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000326 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000327 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
328 self.assertEqual(f.seek(-1, 2), self.LARGE)
329 self.assertEqual(f.read(2), b"x")
330
Antoine Pitrou19690592009-06-12 20:14:08 +0000331 def test_invalid_operations(self):
332 # Try writing on a file opened in read mode and vice-versa.
333 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000334 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000335 self.assertRaises(IOError, fp.read)
336 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000337 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000338 self.assertRaises(IOError, fp.write, b"blah")
339 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000340 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000341 self.assertRaises(IOError, fp.write, "blah")
342 self.assertRaises(IOError, fp.writelines, ["blah\n"])
343
Christian Heimes1a6387e2008-03-26 12:49:49 +0000344 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000345 with self.open(support.TESTFN, "wb", buffering=0) as f:
346 self.assertEqual(f.readable(), False)
347 self.assertEqual(f.writable(), True)
348 self.assertEqual(f.seekable(), True)
349 self.write_ops(f)
350 with self.open(support.TESTFN, "rb", buffering=0) as f:
351 self.assertEqual(f.readable(), True)
352 self.assertEqual(f.writable(), False)
353 self.assertEqual(f.seekable(), True)
354 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000355
356 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000357 with self.open(support.TESTFN, "wb") as f:
358 self.assertEqual(f.readable(), False)
359 self.assertEqual(f.writable(), True)
360 self.assertEqual(f.seekable(), True)
361 self.write_ops(f)
362 with self.open(support.TESTFN, "rb") as f:
363 self.assertEqual(f.readable(), True)
364 self.assertEqual(f.writable(), False)
365 self.assertEqual(f.seekable(), True)
366 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000367
368 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000369 with self.open(support.TESTFN, "wb") as f:
370 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
371 with self.open(support.TESTFN, "rb") as f:
372 self.assertEqual(f.readline(), b"abc\n")
373 self.assertEqual(f.readline(10), b"def\n")
374 self.assertEqual(f.readline(2), b"xy")
375 self.assertEqual(f.readline(4), b"zzy\n")
376 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000377 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000378 self.assertRaises(TypeError, f.readline, 5.3)
379 with self.open(support.TESTFN, "r") as f:
380 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000381
382 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000383 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000384 self.write_ops(f)
385 data = f.getvalue()
386 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000387 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000388 self.read_ops(f, True)
389
390 def test_large_file_ops(self):
391 # On Windows and Mac OSX this test comsumes large resources; It takes
392 # a long time to build the >2GB file and takes >2GB of disk space
393 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000394 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
395 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000396 print("\nTesting large file ops skipped on %s." % sys.platform,
397 file=sys.stderr)
398 print("It requires %d bytes and a long time." % self.LARGE,
399 file=sys.stderr)
400 print("Use 'regrtest.py -u largefile test_io' to run it.",
401 file=sys.stderr)
402 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000403 with self.open(support.TESTFN, "w+b", 0) as f:
404 self.large_file_ops(f)
405 with self.open(support.TESTFN, "w+b") as f:
406 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000407
408 def test_with_open(self):
409 for bufsize in (0, 1, 100):
410 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000411 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000412 f.write(b"xxx")
413 self.assertEqual(f.closed, True)
414 f = None
415 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000416 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000417 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000418 except ZeroDivisionError:
419 self.assertEqual(f.closed, True)
420 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000421 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000422
Antoine Pitroue741cc62009-01-21 00:45:36 +0000423 # issue 5008
424 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000425 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000426 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000427 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000428 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000429 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000430 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000431 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000432 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000433
Christian Heimes1a6387e2008-03-26 12:49:49 +0000434 def test_destructor(self):
435 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000436 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000437 def __del__(self):
438 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000439 try:
440 f = super(MyFileIO, self).__del__
441 except AttributeError:
442 pass
443 else:
444 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000445 def close(self):
446 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000447 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000448 def flush(self):
449 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000450 super(MyFileIO, self).flush()
451 f = MyFileIO(support.TESTFN, "wb")
452 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000453 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000454 support.gc_collect()
455 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000456 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000457 self.assertEqual(f.read(), b"xxx")
458
459 def _check_base_destructor(self, base):
460 record = []
461 class MyIO(base):
462 def __init__(self):
463 # This exercises the availability of attributes on object
464 # destruction.
465 # (in the C version, close() is called by the tp_dealloc
466 # function, not by __del__)
467 self.on_del = 1
468 self.on_close = 2
469 self.on_flush = 3
470 def __del__(self):
471 record.append(self.on_del)
472 try:
473 f = super(MyIO, self).__del__
474 except AttributeError:
475 pass
476 else:
477 f()
478 def close(self):
479 record.append(self.on_close)
480 super(MyIO, self).close()
481 def flush(self):
482 record.append(self.on_flush)
483 super(MyIO, self).flush()
484 f = MyIO()
485 del f
486 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000487 self.assertEqual(record, [1, 2, 3])
488
Antoine Pitrou19690592009-06-12 20:14:08 +0000489 def test_IOBase_destructor(self):
490 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000491
Antoine Pitrou19690592009-06-12 20:14:08 +0000492 def test_RawIOBase_destructor(self):
493 self._check_base_destructor(self.RawIOBase)
494
495 def test_BufferedIOBase_destructor(self):
496 self._check_base_destructor(self.BufferedIOBase)
497
498 def test_TextIOBase_destructor(self):
499 self._check_base_destructor(self.TextIOBase)
500
501 def test_close_flushes(self):
502 with self.open(support.TESTFN, "wb") as f:
503 f.write(b"xxx")
504 with self.open(support.TESTFN, "rb") as f:
505 self.assertEqual(f.read(), b"xxx")
506
507 def test_array_writes(self):
508 a = array.array(b'i', range(10))
509 n = len(a.tostring())
510 with self.open(support.TESTFN, "wb", 0) as f:
511 self.assertEqual(f.write(a), n)
512 with self.open(support.TESTFN, "wb") as f:
513 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000514
515 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000516 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000517 closefd=False)
518
Antoine Pitrou19690592009-06-12 20:14:08 +0000519 def test_read_closed(self):
520 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000521 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000522 with self.open(support.TESTFN, "r") as f:
523 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000524 self.assertEqual(file.read(), "egg\n")
525 file.seek(0)
526 file.close()
527 self.assertRaises(ValueError, file.read)
528
529 def test_no_closefd_with_filename(self):
530 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000532
533 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000534 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000535 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000536 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000537 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000538 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000539 self.assertEqual(file.buffer.raw.closefd, False)
540
Antoine Pitrou19690592009-06-12 20:14:08 +0000541 def test_garbage_collection(self):
542 # FileIO objects are collected, and collecting them flushes
543 # all data to disk.
544 f = self.FileIO(support.TESTFN, "wb")
545 f.write(b"abcxxx")
546 f.f = f
547 wr = weakref.ref(f)
548 del f
549 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000550 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000551 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000552 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000553
Antoine Pitrou19690592009-06-12 20:14:08 +0000554 def test_unbounded_file(self):
555 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
556 zero = "/dev/zero"
557 if not os.path.exists(zero):
558 self.skipTest("{0} does not exist".format(zero))
559 if sys.maxsize > 0x7FFFFFFF:
560 self.skipTest("test can only run in a 32-bit address space")
561 if support.real_max_memuse < support._2G:
562 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000563 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000564 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000565 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000566 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000567 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000568 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000569
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000570 def test_flush_error_on_close(self):
571 f = self.open(support.TESTFN, "wb", buffering=0)
572 def bad_flush():
573 raise IOError()
574 f.flush = bad_flush
575 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600576 self.assertTrue(f.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000577
578 def test_multi_close(self):
579 f = self.open(support.TESTFN, "wb", buffering=0)
580 f.close()
581 f.close()
582 f.close()
583 self.assertRaises(ValueError, f.flush)
584
Antoine Pitrou6391b342010-09-14 18:48:19 +0000585 def test_RawIOBase_read(self):
586 # Exercise the default RawIOBase.read() implementation (which calls
587 # readinto() internally).
588 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
589 self.assertEqual(rawio.read(2), b"ab")
590 self.assertEqual(rawio.read(2), b"c")
591 self.assertEqual(rawio.read(2), b"d")
592 self.assertEqual(rawio.read(2), None)
593 self.assertEqual(rawio.read(2), b"ef")
594 self.assertEqual(rawio.read(2), b"g")
595 self.assertEqual(rawio.read(2), None)
596 self.assertEqual(rawio.read(2), b"")
597
Hynek Schlawack877effc2012-05-25 09:24:18 +0200598 def test_fileio_closefd(self):
599 # Issue #4841
600 with self.open(__file__, 'rb') as f1, \
601 self.open(__file__, 'rb') as f2:
602 fileio = self.FileIO(f1.fileno(), closefd=False)
603 # .__init__() must not close f1
604 fileio.__init__(f2.fileno(), closefd=False)
605 f1.readline()
606 # .close() must not close f2
607 fileio.close()
608 f2.readline()
609
610
Antoine Pitrou19690592009-06-12 20:14:08 +0000611class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200612
613 def test_IOBase_finalize(self):
614 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
615 # class which inherits IOBase and an object of this class are caught
616 # in a reference cycle and close() is already in the method cache.
617 class MyIO(self.IOBase):
618 def close(self):
619 pass
620
621 # create an instance to populate the method cache
622 MyIO()
623 obj = MyIO()
624 obj.obj = obj
625 wr = weakref.ref(obj)
626 del MyIO
627 del obj
628 support.gc_collect()
629 self.assertTrue(wr() is None, wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000630
Antoine Pitrou19690592009-06-12 20:14:08 +0000631class PyIOTest(IOTest):
632 test_array_writes = unittest.skip(
633 "len(array.array) returns number of elements rather than bytelength"
634 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000635
636
Antoine Pitrou19690592009-06-12 20:14:08 +0000637class CommonBufferedTests:
638 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
639
640 def test_detach(self):
641 raw = self.MockRawIO()
642 buf = self.tp(raw)
643 self.assertIs(buf.detach(), raw)
644 self.assertRaises(ValueError, buf.detach)
645
646 def test_fileno(self):
647 rawio = self.MockRawIO()
648 bufio = self.tp(rawio)
649
Ezio Melotti2623a372010-11-21 13:34:58 +0000650 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000651
652 def test_no_fileno(self):
653 # XXX will we always have fileno() function? If so, kill
654 # this test. Else, write it.
655 pass
656
657 def test_invalid_args(self):
658 rawio = self.MockRawIO()
659 bufio = self.tp(rawio)
660 # Invalid whence
661 self.assertRaises(ValueError, bufio.seek, 0, -1)
662 self.assertRaises(ValueError, bufio.seek, 0, 3)
663
664 def test_override_destructor(self):
665 tp = self.tp
666 record = []
667 class MyBufferedIO(tp):
668 def __del__(self):
669 record.append(1)
670 try:
671 f = super(MyBufferedIO, self).__del__
672 except AttributeError:
673 pass
674 else:
675 f()
676 def close(self):
677 record.append(2)
678 super(MyBufferedIO, self).close()
679 def flush(self):
680 record.append(3)
681 super(MyBufferedIO, self).flush()
682 rawio = self.MockRawIO()
683 bufio = MyBufferedIO(rawio)
684 writable = bufio.writable()
685 del bufio
686 support.gc_collect()
687 if writable:
688 self.assertEqual(record, [1, 2, 3])
689 else:
690 self.assertEqual(record, [1, 2])
691
692 def test_context_manager(self):
693 # Test usability as a context manager
694 rawio = self.MockRawIO()
695 bufio = self.tp(rawio)
696 def _with():
697 with bufio:
698 pass
699 _with()
700 # bufio should now be closed, and using it a second time should raise
701 # a ValueError.
702 self.assertRaises(ValueError, _with)
703
704 def test_error_through_destructor(self):
705 # Test that the exception state is not modified by a destructor,
706 # even if close() fails.
707 rawio = self.CloseFailureIO()
708 def f():
709 self.tp(rawio).xyzzy
710 with support.captured_output("stderr") as s:
711 self.assertRaises(AttributeError, f)
712 s = s.getvalue().strip()
713 if s:
714 # The destructor *may* have printed an unraisable error, check it
715 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000716 self.assertTrue(s.startswith("Exception IOError: "), s)
717 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000718
719 def test_repr(self):
720 raw = self.MockRawIO()
721 b = self.tp(raw)
722 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
723 self.assertEqual(repr(b), "<%s>" % clsname)
724 raw.name = "dummy"
725 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
726 raw.name = b"dummy"
727 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000728
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000729 def test_flush_error_on_close(self):
730 raw = self.MockRawIO()
731 def bad_flush():
732 raise IOError()
733 raw.flush = bad_flush
734 b = self.tp(raw)
735 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600736 self.assertTrue(b.closed)
737
738 def test_close_error_on_close(self):
739 raw = self.MockRawIO()
740 def bad_flush():
741 raise IOError('flush')
742 def bad_close():
743 raise IOError('close')
744 raw.close = bad_close
745 b = self.tp(raw)
746 b.flush = bad_flush
747 with self.assertRaises(IOError) as err: # exception not swallowed
748 b.close()
749 self.assertEqual(err.exception.args, ('close',))
750 self.assertFalse(b.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000751
752 def test_multi_close(self):
753 raw = self.MockRawIO()
754 b = self.tp(raw)
755 b.close()
756 b.close()
757 b.close()
758 self.assertRaises(ValueError, b.flush)
759
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000760 def test_readonly_attributes(self):
761 raw = self.MockRawIO()
762 buf = self.tp(raw)
763 x = self.MockRawIO()
764 with self.assertRaises((AttributeError, TypeError)):
765 buf.raw = x
766
Christian Heimes1a6387e2008-03-26 12:49:49 +0000767
Antoine Pitroubff5df02012-07-29 19:02:46 +0200768class SizeofTest:
769
770 @support.cpython_only
771 def test_sizeof(self):
772 bufsize1 = 4096
773 bufsize2 = 8192
774 rawio = self.MockRawIO()
775 bufio = self.tp(rawio, buffer_size=bufsize1)
776 size = sys.getsizeof(bufio) - bufsize1
777 rawio = self.MockRawIO()
778 bufio = self.tp(rawio, buffer_size=bufsize2)
779 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
780
781
Antoine Pitrou19690592009-06-12 20:14:08 +0000782class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
783 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000784
Antoine Pitrou19690592009-06-12 20:14:08 +0000785 def test_constructor(self):
786 rawio = self.MockRawIO([b"abc"])
787 bufio = self.tp(rawio)
788 bufio.__init__(rawio)
789 bufio.__init__(rawio, buffer_size=1024)
790 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000791 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000792 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
793 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
794 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
795 rawio = self.MockRawIO([b"abc"])
796 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000797 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000798
Antoine Pitrou19690592009-06-12 20:14:08 +0000799 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000800 for arg in (None, 7):
801 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
802 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000803 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000804 # Invalid args
805 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000806
Antoine Pitrou19690592009-06-12 20:14:08 +0000807 def test_read1(self):
808 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
809 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000810 self.assertEqual(b"a", bufio.read(1))
811 self.assertEqual(b"b", bufio.read1(1))
812 self.assertEqual(rawio._reads, 1)
813 self.assertEqual(b"c", bufio.read1(100))
814 self.assertEqual(rawio._reads, 1)
815 self.assertEqual(b"d", bufio.read1(100))
816 self.assertEqual(rawio._reads, 2)
817 self.assertEqual(b"efg", bufio.read1(100))
818 self.assertEqual(rawio._reads, 3)
819 self.assertEqual(b"", bufio.read1(100))
820 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000821 # Invalid args
822 self.assertRaises(ValueError, bufio.read1, -1)
823
824 def test_readinto(self):
825 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
826 bufio = self.tp(rawio)
827 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000828 self.assertEqual(bufio.readinto(b), 2)
829 self.assertEqual(b, b"ab")
830 self.assertEqual(bufio.readinto(b), 2)
831 self.assertEqual(b, b"cd")
832 self.assertEqual(bufio.readinto(b), 2)
833 self.assertEqual(b, b"ef")
834 self.assertEqual(bufio.readinto(b), 1)
835 self.assertEqual(b, b"gf")
836 self.assertEqual(bufio.readinto(b), 0)
837 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000838
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000839 def test_readlines(self):
840 def bufio():
841 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
842 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000843 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
844 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
845 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000846
Antoine Pitrou19690592009-06-12 20:14:08 +0000847 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000848 data = b"abcdefghi"
849 dlen = len(data)
850
851 tests = [
852 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
853 [ 100, [ 3, 3, 3], [ dlen ] ],
854 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
855 ]
856
857 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000858 rawio = self.MockFileIO(data)
859 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000860 pos = 0
861 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000862 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000863 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000864 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000865 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000866
Antoine Pitrou19690592009-06-12 20:14:08 +0000867 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000868 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000869 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
870 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000871 self.assertEqual(b"abcd", bufio.read(6))
872 self.assertEqual(b"e", bufio.read(1))
873 self.assertEqual(b"fg", bufio.read())
874 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200875 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000876 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000877
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200878 rawio = self.MockRawIO((b"a", None, None))
879 self.assertEqual(b"a", rawio.readall())
880 self.assertIsNone(rawio.readall())
881
Antoine Pitrou19690592009-06-12 20:14:08 +0000882 def test_read_past_eof(self):
883 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
884 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000885
Ezio Melotti2623a372010-11-21 13:34:58 +0000886 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000887
Antoine Pitrou19690592009-06-12 20:14:08 +0000888 def test_read_all(self):
889 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
890 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000891
Ezio Melotti2623a372010-11-21 13:34:58 +0000892 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000893
Victor Stinner6a102812010-04-27 23:55:59 +0000894 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000895 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000896 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000897 try:
898 # Write out many bytes with exactly the same number of 0's,
899 # 1's... 255's. This will help us check that concurrent reading
900 # doesn't duplicate or forget contents.
901 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000902 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000903 random.shuffle(l)
904 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000905 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000906 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000907 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000908 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000909 errors = []
910 results = []
911 def f():
912 try:
913 # Intra-buffer read then buffer-flushing read
914 for n in cycle([1, 19]):
915 s = bufio.read(n)
916 if not s:
917 break
918 # list.append() is atomic
919 results.append(s)
920 except Exception as e:
921 errors.append(e)
922 raise
923 threads = [threading.Thread(target=f) for x in range(20)]
924 for t in threads:
925 t.start()
926 time.sleep(0.02) # yield
927 for t in threads:
928 t.join()
929 self.assertFalse(errors,
930 "the following exceptions were caught: %r" % errors)
931 s = b''.join(results)
932 for i in range(256):
933 c = bytes(bytearray([i]))
934 self.assertEqual(s.count(c), N)
935 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000936 support.unlink(support.TESTFN)
937
938 def test_misbehaved_io(self):
939 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
940 bufio = self.tp(rawio)
941 self.assertRaises(IOError, bufio.seek, 0)
942 self.assertRaises(IOError, bufio.tell)
943
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000944 def test_no_extraneous_read(self):
945 # Issue #9550; when the raw IO object has satisfied the read request,
946 # we should not issue any additional reads, otherwise it may block
947 # (e.g. socket).
948 bufsize = 16
949 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
950 rawio = self.MockRawIO([b"x" * n])
951 bufio = self.tp(rawio, bufsize)
952 self.assertEqual(bufio.read(n), b"x" * n)
953 # Simple case: one raw read is enough to satisfy the request.
954 self.assertEqual(rawio._extraneous_reads, 0,
955 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
956 # A more complex case where two raw reads are needed to satisfy
957 # the request.
958 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
959 bufio = self.tp(rawio, bufsize)
960 self.assertEqual(bufio.read(n), b"x" * n)
961 self.assertEqual(rawio._extraneous_reads, 0,
962 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
963
964
Antoine Pitroubff5df02012-07-29 19:02:46 +0200965class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +0000966 tp = io.BufferedReader
967
968 def test_constructor(self):
969 BufferedReaderTest.test_constructor(self)
970 # The allocation can succeed on 32-bit builds, e.g. with more
971 # than 2GB RAM and a 64-bit kernel.
972 if sys.maxsize > 0x7FFFFFFF:
973 rawio = self.MockRawIO()
974 bufio = self.tp(rawio)
975 self.assertRaises((OverflowError, MemoryError, ValueError),
976 bufio.__init__, rawio, sys.maxsize)
977
978 def test_initialization(self):
979 rawio = self.MockRawIO([b"abc"])
980 bufio = self.tp(rawio)
981 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
982 self.assertRaises(ValueError, bufio.read)
983 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
984 self.assertRaises(ValueError, bufio.read)
985 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
986 self.assertRaises(ValueError, bufio.read)
987
988 def test_misbehaved_io_read(self):
989 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
990 bufio = self.tp(rawio)
991 # _pyio.BufferedReader seems to implement reading different, so that
992 # checking this is not so easy.
993 self.assertRaises(IOError, bufio.read, 10)
994
995 def test_garbage_collection(self):
996 # C BufferedReader objects are collected.
997 # The Python version has __del__, so it ends into gc.garbage instead
998 rawio = self.FileIO(support.TESTFN, "w+b")
999 f = self.tp(rawio)
1000 f.f = f
1001 wr = weakref.ref(f)
1002 del f
1003 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001004 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00001005
1006class PyBufferedReaderTest(BufferedReaderTest):
1007 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001008
1009
Antoine Pitrou19690592009-06-12 20:14:08 +00001010class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1011 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001012
Antoine Pitrou19690592009-06-12 20:14:08 +00001013 def test_constructor(self):
1014 rawio = self.MockRawIO()
1015 bufio = self.tp(rawio)
1016 bufio.__init__(rawio)
1017 bufio.__init__(rawio, buffer_size=1024)
1018 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +00001019 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001020 bufio.flush()
1021 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1022 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1023 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1024 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +00001025 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001026 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001027 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001028
Antoine Pitrou19690592009-06-12 20:14:08 +00001029 def test_detach_flush(self):
1030 raw = self.MockRawIO()
1031 buf = self.tp(raw)
1032 buf.write(b"howdy!")
1033 self.assertFalse(raw._write_stack)
1034 buf.detach()
1035 self.assertEqual(raw._write_stack, [b"howdy!"])
1036
1037 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001038 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +00001039 writer = self.MockRawIO()
1040 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001041 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001042 self.assertFalse(writer._write_stack)
1043
Antoine Pitrou19690592009-06-12 20:14:08 +00001044 def test_write_overflow(self):
1045 writer = self.MockRawIO()
1046 bufio = self.tp(writer, 8)
1047 contents = b"abcdefghijklmnop"
1048 for n in range(0, len(contents), 3):
1049 bufio.write(contents[n:n+3])
1050 flushed = b"".join(writer._write_stack)
1051 # At least (total - 8) bytes were implicitly flushed, perhaps more
1052 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001053 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001054
Antoine Pitrou19690592009-06-12 20:14:08 +00001055 def check_writes(self, intermediate_func):
1056 # Lots of writes, test the flushed output is as expected.
1057 contents = bytes(range(256)) * 1000
1058 n = 0
1059 writer = self.MockRawIO()
1060 bufio = self.tp(writer, 13)
1061 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1062 def gen_sizes():
1063 for size in count(1):
1064 for i in range(15):
1065 yield size
1066 sizes = gen_sizes()
1067 while n < len(contents):
1068 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001069 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001070 intermediate_func(bufio)
1071 n += size
1072 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001073 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001074 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001075
Antoine Pitrou19690592009-06-12 20:14:08 +00001076 def test_writes(self):
1077 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001078
Antoine Pitrou19690592009-06-12 20:14:08 +00001079 def test_writes_and_flushes(self):
1080 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001081
Antoine Pitrou19690592009-06-12 20:14:08 +00001082 def test_writes_and_seeks(self):
1083 def _seekabs(bufio):
1084 pos = bufio.tell()
1085 bufio.seek(pos + 1, 0)
1086 bufio.seek(pos - 1, 0)
1087 bufio.seek(pos, 0)
1088 self.check_writes(_seekabs)
1089 def _seekrel(bufio):
1090 pos = bufio.seek(0, 1)
1091 bufio.seek(+1, 1)
1092 bufio.seek(-1, 1)
1093 bufio.seek(pos, 0)
1094 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001095
Antoine Pitrou19690592009-06-12 20:14:08 +00001096 def test_writes_and_truncates(self):
1097 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001098
Antoine Pitrou19690592009-06-12 20:14:08 +00001099 def test_write_non_blocking(self):
1100 raw = self.MockNonBlockWriterIO()
1101 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001102
Ezio Melotti2623a372010-11-21 13:34:58 +00001103 self.assertEqual(bufio.write(b"abcd"), 4)
1104 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001105 # 1 byte will be written, the rest will be buffered
1106 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001107 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001108
Antoine Pitrou19690592009-06-12 20:14:08 +00001109 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1110 raw.block_on(b"0")
1111 try:
1112 bufio.write(b"opqrwxyz0123456789")
1113 except self.BlockingIOError as e:
1114 written = e.characters_written
1115 else:
1116 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001117 self.assertEqual(written, 16)
1118 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001119 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001120
Ezio Melotti2623a372010-11-21 13:34:58 +00001121 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001122 s = raw.pop_written()
1123 # Previously buffered bytes were flushed
1124 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001125
Antoine Pitrou19690592009-06-12 20:14:08 +00001126 def test_write_and_rewind(self):
1127 raw = io.BytesIO()
1128 bufio = self.tp(raw, 4)
1129 self.assertEqual(bufio.write(b"abcdef"), 6)
1130 self.assertEqual(bufio.tell(), 6)
1131 bufio.seek(0, 0)
1132 self.assertEqual(bufio.write(b"XY"), 2)
1133 bufio.seek(6, 0)
1134 self.assertEqual(raw.getvalue(), b"XYcdef")
1135 self.assertEqual(bufio.write(b"123456"), 6)
1136 bufio.flush()
1137 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001138
Antoine Pitrou19690592009-06-12 20:14:08 +00001139 def test_flush(self):
1140 writer = self.MockRawIO()
1141 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001142 bufio.write(b"abc")
1143 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001144 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001145
Antoine Pitrou78e761e2012-10-16 22:57:11 +02001146 def test_writelines(self):
1147 l = [b'ab', b'cd', b'ef']
1148 writer = self.MockRawIO()
1149 bufio = self.tp(writer, 8)
1150 bufio.writelines(l)
1151 bufio.flush()
1152 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1153
1154 def test_writelines_userlist(self):
1155 l = UserList([b'ab', b'cd', b'ef'])
1156 writer = self.MockRawIO()
1157 bufio = self.tp(writer, 8)
1158 bufio.writelines(l)
1159 bufio.flush()
1160 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1161
1162 def test_writelines_error(self):
1163 writer = self.MockRawIO()
1164 bufio = self.tp(writer, 8)
1165 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1166 self.assertRaises(TypeError, bufio.writelines, None)
1167
Antoine Pitrou19690592009-06-12 20:14:08 +00001168 def test_destructor(self):
1169 writer = self.MockRawIO()
1170 bufio = self.tp(writer, 8)
1171 bufio.write(b"abc")
1172 del bufio
1173 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001174 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001175
1176 def test_truncate(self):
1177 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001178 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001179 bufio = self.tp(raw, 8)
1180 bufio.write(b"abcdef")
1181 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001182 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001183 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001184 self.assertEqual(f.read(), b"abc")
1185
Victor Stinner6a102812010-04-27 23:55:59 +00001186 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001187 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001188 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001189 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001190 # Write out many bytes from many threads and test they were
1191 # all flushed.
1192 N = 1000
1193 contents = bytes(range(256)) * N
1194 sizes = cycle([1, 19])
1195 n = 0
1196 queue = deque()
1197 while n < len(contents):
1198 size = next(sizes)
1199 queue.append(contents[n:n+size])
1200 n += size
1201 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001202 # We use a real file object because it allows us to
1203 # exercise situations where the GIL is released before
1204 # writing the buffer to the raw streams. This is in addition
1205 # to concurrency issues due to switching threads in the middle
1206 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001207 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001208 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001209 errors = []
1210 def f():
1211 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001212 while True:
1213 try:
1214 s = queue.popleft()
1215 except IndexError:
1216 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001217 bufio.write(s)
1218 except Exception as e:
1219 errors.append(e)
1220 raise
1221 threads = [threading.Thread(target=f) for x in range(20)]
1222 for t in threads:
1223 t.start()
1224 time.sleep(0.02) # yield
1225 for t in threads:
1226 t.join()
1227 self.assertFalse(errors,
1228 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001229 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001230 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001231 s = f.read()
1232 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001233 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001234 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001235 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001236
Antoine Pitrou19690592009-06-12 20:14:08 +00001237 def test_misbehaved_io(self):
1238 rawio = self.MisbehavedRawIO()
1239 bufio = self.tp(rawio, 5)
1240 self.assertRaises(IOError, bufio.seek, 0)
1241 self.assertRaises(IOError, bufio.tell)
1242 self.assertRaises(IOError, bufio.write, b"abcdef")
1243
1244 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001245 with support.check_warnings(("max_buffer_size is deprecated",
1246 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001247 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001248
Benjamin Petersona2d6d712012-12-20 12:24:10 -06001249 def test_write_error_on_close(self):
1250 raw = self.MockRawIO()
1251 def bad_write(b):
1252 raise IOError()
1253 raw.write = bad_write
1254 b = self.tp(raw)
1255 b.write(b'spam')
1256 self.assertRaises(IOError, b.close) # exception not swallowed
1257 self.assertTrue(b.closed)
1258
Antoine Pitrou19690592009-06-12 20:14:08 +00001259
Antoine Pitroubff5df02012-07-29 19:02:46 +02001260class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001261 tp = io.BufferedWriter
1262
1263 def test_constructor(self):
1264 BufferedWriterTest.test_constructor(self)
1265 # The allocation can succeed on 32-bit builds, e.g. with more
1266 # than 2GB RAM and a 64-bit kernel.
1267 if sys.maxsize > 0x7FFFFFFF:
1268 rawio = self.MockRawIO()
1269 bufio = self.tp(rawio)
1270 self.assertRaises((OverflowError, MemoryError, ValueError),
1271 bufio.__init__, rawio, sys.maxsize)
1272
1273 def test_initialization(self):
1274 rawio = self.MockRawIO()
1275 bufio = self.tp(rawio)
1276 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1277 self.assertRaises(ValueError, bufio.write, b"def")
1278 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1279 self.assertRaises(ValueError, bufio.write, b"def")
1280 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1281 self.assertRaises(ValueError, bufio.write, b"def")
1282
1283 def test_garbage_collection(self):
1284 # C BufferedWriter objects are collected, and collecting them flushes
1285 # all data to disk.
1286 # The Python version has __del__, so it ends into gc.garbage instead
1287 rawio = self.FileIO(support.TESTFN, "w+b")
1288 f = self.tp(rawio)
1289 f.write(b"123xxx")
1290 f.x = f
1291 wr = weakref.ref(f)
1292 del f
1293 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001294 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001295 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001296 self.assertEqual(f.read(), b"123xxx")
1297
1298
1299class PyBufferedWriterTest(BufferedWriterTest):
1300 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001301
1302class BufferedRWPairTest(unittest.TestCase):
1303
Antoine Pitrou19690592009-06-12 20:14:08 +00001304 def test_constructor(self):
1305 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001306 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001307
Antoine Pitrou19690592009-06-12 20:14:08 +00001308 def test_detach(self):
1309 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1310 self.assertRaises(self.UnsupportedOperation, pair.detach)
1311
1312 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001313 with support.check_warnings(("max_buffer_size is deprecated",
1314 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001315 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001316
1317 def test_constructor_with_not_readable(self):
1318 class NotReadable(MockRawIO):
1319 def readable(self):
1320 return False
1321
1322 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1323
1324 def test_constructor_with_not_writeable(self):
1325 class NotWriteable(MockRawIO):
1326 def writable(self):
1327 return False
1328
1329 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1330
1331 def test_read(self):
1332 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1333
1334 self.assertEqual(pair.read(3), b"abc")
1335 self.assertEqual(pair.read(1), b"d")
1336 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001337 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1338 self.assertEqual(pair.read(None), b"abc")
1339
1340 def test_readlines(self):
1341 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1342 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1343 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1344 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001345
1346 def test_read1(self):
1347 # .read1() is delegated to the underlying reader object, so this test
1348 # can be shallow.
1349 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1350
1351 self.assertEqual(pair.read1(3), b"abc")
1352
1353 def test_readinto(self):
1354 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1355
1356 data = bytearray(5)
1357 self.assertEqual(pair.readinto(data), 5)
1358 self.assertEqual(data, b"abcde")
1359
1360 def test_write(self):
1361 w = self.MockRawIO()
1362 pair = self.tp(self.MockRawIO(), w)
1363
1364 pair.write(b"abc")
1365 pair.flush()
1366 pair.write(b"def")
1367 pair.flush()
1368 self.assertEqual(w._write_stack, [b"abc", b"def"])
1369
1370 def test_peek(self):
1371 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1372
1373 self.assertTrue(pair.peek(3).startswith(b"abc"))
1374 self.assertEqual(pair.read(3), b"abc")
1375
1376 def test_readable(self):
1377 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1378 self.assertTrue(pair.readable())
1379
1380 def test_writeable(self):
1381 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1382 self.assertTrue(pair.writable())
1383
1384 def test_seekable(self):
1385 # BufferedRWPairs are never seekable, even if their readers and writers
1386 # are.
1387 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1388 self.assertFalse(pair.seekable())
1389
1390 # .flush() is delegated to the underlying writer object and has been
1391 # tested in the test_write method.
1392
1393 def test_close_and_closed(self):
1394 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1395 self.assertFalse(pair.closed)
1396 pair.close()
1397 self.assertTrue(pair.closed)
1398
1399 def test_isatty(self):
1400 class SelectableIsAtty(MockRawIO):
1401 def __init__(self, isatty):
1402 MockRawIO.__init__(self)
1403 self._isatty = isatty
1404
1405 def isatty(self):
1406 return self._isatty
1407
1408 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1409 self.assertFalse(pair.isatty())
1410
1411 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1412 self.assertTrue(pair.isatty())
1413
1414 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1415 self.assertTrue(pair.isatty())
1416
1417 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1418 self.assertTrue(pair.isatty())
1419
1420class CBufferedRWPairTest(BufferedRWPairTest):
1421 tp = io.BufferedRWPair
1422
1423class PyBufferedRWPairTest(BufferedRWPairTest):
1424 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001425
1426
Antoine Pitrou19690592009-06-12 20:14:08 +00001427class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1428 read_mode = "rb+"
1429 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001430
Antoine Pitrou19690592009-06-12 20:14:08 +00001431 def test_constructor(self):
1432 BufferedReaderTest.test_constructor(self)
1433 BufferedWriterTest.test_constructor(self)
1434
1435 def test_read_and_write(self):
1436 raw = self.MockRawIO((b"asdf", b"ghjk"))
1437 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001438
1439 self.assertEqual(b"as", rw.read(2))
1440 rw.write(b"ddd")
1441 rw.write(b"eee")
1442 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001443 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001444 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001445
Antoine Pitrou19690592009-06-12 20:14:08 +00001446 def test_seek_and_tell(self):
1447 raw = self.BytesIO(b"asdfghjkl")
1448 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001449
Ezio Melotti2623a372010-11-21 13:34:58 +00001450 self.assertEqual(b"as", rw.read(2))
1451 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001452 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001453 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001454
Antoine Pitrou808cec52011-08-20 15:40:58 +02001455 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001456 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001457 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001458 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001459 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001460 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001461 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001462 self.assertEqual(7, rw.tell())
1463 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001464 rw.flush()
1465 self.assertEqual(b"asdf123fl", raw.getvalue())
1466
Christian Heimes1a6387e2008-03-26 12:49:49 +00001467 self.assertRaises(TypeError, rw.seek, 0.0)
1468
Antoine Pitrou19690592009-06-12 20:14:08 +00001469 def check_flush_and_read(self, read_func):
1470 raw = self.BytesIO(b"abcdefghi")
1471 bufio = self.tp(raw)
1472
Ezio Melotti2623a372010-11-21 13:34:58 +00001473 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001474 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001475 self.assertEqual(b"ef", read_func(bufio, 2))
1476 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001477 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001478 self.assertEqual(6, bufio.tell())
1479 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001480 raw.seek(0, 0)
1481 raw.write(b"XYZ")
1482 # flush() resets the read buffer
1483 bufio.flush()
1484 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001485 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001486
1487 def test_flush_and_read(self):
1488 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1489
1490 def test_flush_and_readinto(self):
1491 def _readinto(bufio, n=-1):
1492 b = bytearray(n if n >= 0 else 9999)
1493 n = bufio.readinto(b)
1494 return bytes(b[:n])
1495 self.check_flush_and_read(_readinto)
1496
1497 def test_flush_and_peek(self):
1498 def _peek(bufio, n=-1):
1499 # This relies on the fact that the buffer can contain the whole
1500 # raw stream, otherwise peek() can return less.
1501 b = bufio.peek(n)
1502 if n != -1:
1503 b = b[:n]
1504 bufio.seek(len(b), 1)
1505 return b
1506 self.check_flush_and_read(_peek)
1507
1508 def test_flush_and_write(self):
1509 raw = self.BytesIO(b"abcdefghi")
1510 bufio = self.tp(raw)
1511
1512 bufio.write(b"123")
1513 bufio.flush()
1514 bufio.write(b"45")
1515 bufio.flush()
1516 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001517 self.assertEqual(b"12345fghi", raw.getvalue())
1518 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001519
1520 def test_threads(self):
1521 BufferedReaderTest.test_threads(self)
1522 BufferedWriterTest.test_threads(self)
1523
1524 def test_writes_and_peek(self):
1525 def _peek(bufio):
1526 bufio.peek(1)
1527 self.check_writes(_peek)
1528 def _peek(bufio):
1529 pos = bufio.tell()
1530 bufio.seek(-1, 1)
1531 bufio.peek(1)
1532 bufio.seek(pos, 0)
1533 self.check_writes(_peek)
1534
1535 def test_writes_and_reads(self):
1536 def _read(bufio):
1537 bufio.seek(-1, 1)
1538 bufio.read(1)
1539 self.check_writes(_read)
1540
1541 def test_writes_and_read1s(self):
1542 def _read1(bufio):
1543 bufio.seek(-1, 1)
1544 bufio.read1(1)
1545 self.check_writes(_read1)
1546
1547 def test_writes_and_readintos(self):
1548 def _read(bufio):
1549 bufio.seek(-1, 1)
1550 bufio.readinto(bytearray(1))
1551 self.check_writes(_read)
1552
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001553 def test_write_after_readahead(self):
1554 # Issue #6629: writing after the buffer was filled by readahead should
1555 # first rewind the raw stream.
1556 for overwrite_size in [1, 5]:
1557 raw = self.BytesIO(b"A" * 10)
1558 bufio = self.tp(raw, 4)
1559 # Trigger readahead
1560 self.assertEqual(bufio.read(1), b"A")
1561 self.assertEqual(bufio.tell(), 1)
1562 # Overwriting should rewind the raw stream if it needs so
1563 bufio.write(b"B" * overwrite_size)
1564 self.assertEqual(bufio.tell(), overwrite_size + 1)
1565 # If the write size was smaller than the buffer size, flush() and
1566 # check that rewind happens.
1567 bufio.flush()
1568 self.assertEqual(bufio.tell(), overwrite_size + 1)
1569 s = raw.getvalue()
1570 self.assertEqual(s,
1571 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1572
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001573 def test_write_rewind_write(self):
1574 # Various combinations of reading / writing / seeking backwards / writing again
1575 def mutate(bufio, pos1, pos2):
1576 assert pos2 >= pos1
1577 # Fill the buffer
1578 bufio.seek(pos1)
1579 bufio.read(pos2 - pos1)
1580 bufio.write(b'\x02')
1581 # This writes earlier than the previous write, but still inside
1582 # the buffer.
1583 bufio.seek(pos1)
1584 bufio.write(b'\x01')
1585
1586 b = b"\x80\x81\x82\x83\x84"
1587 for i in range(0, len(b)):
1588 for j in range(i, len(b)):
1589 raw = self.BytesIO(b)
1590 bufio = self.tp(raw, 100)
1591 mutate(bufio, i, j)
1592 bufio.flush()
1593 expected = bytearray(b)
1594 expected[j] = 2
1595 expected[i] = 1
1596 self.assertEqual(raw.getvalue(), expected,
1597 "failed result for i=%d, j=%d" % (i, j))
1598
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001599 def test_truncate_after_read_or_write(self):
1600 raw = self.BytesIO(b"A" * 10)
1601 bufio = self.tp(raw, 100)
1602 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1603 self.assertEqual(bufio.truncate(), 2)
1604 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1605 self.assertEqual(bufio.truncate(), 4)
1606
Antoine Pitrou19690592009-06-12 20:14:08 +00001607 def test_misbehaved_io(self):
1608 BufferedReaderTest.test_misbehaved_io(self)
1609 BufferedWriterTest.test_misbehaved_io(self)
1610
Antoine Pitrou808cec52011-08-20 15:40:58 +02001611 def test_interleaved_read_write(self):
1612 # Test for issue #12213
1613 with self.BytesIO(b'abcdefgh') as raw:
1614 with self.tp(raw, 100) as f:
1615 f.write(b"1")
1616 self.assertEqual(f.read(1), b'b')
1617 f.write(b'2')
1618 self.assertEqual(f.read1(1), b'd')
1619 f.write(b'3')
1620 buf = bytearray(1)
1621 f.readinto(buf)
1622 self.assertEqual(buf, b'f')
1623 f.write(b'4')
1624 self.assertEqual(f.peek(1), b'h')
1625 f.flush()
1626 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1627
1628 with self.BytesIO(b'abc') as raw:
1629 with self.tp(raw, 100) as f:
1630 self.assertEqual(f.read(1), b'a')
1631 f.write(b"2")
1632 self.assertEqual(f.read(1), b'c')
1633 f.flush()
1634 self.assertEqual(raw.getvalue(), b'a2c')
1635
1636 def test_interleaved_readline_write(self):
1637 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1638 with self.tp(raw) as f:
1639 f.write(b'1')
1640 self.assertEqual(f.readline(), b'b\n')
1641 f.write(b'2')
1642 self.assertEqual(f.readline(), b'def\n')
1643 f.write(b'3')
1644 self.assertEqual(f.readline(), b'\n')
1645 f.flush()
1646 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1647
Antoine Pitroubff5df02012-07-29 19:02:46 +02001648class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1649 BufferedRandomTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001650 tp = io.BufferedRandom
1651
1652 def test_constructor(self):
1653 BufferedRandomTest.test_constructor(self)
1654 # The allocation can succeed on 32-bit builds, e.g. with more
1655 # than 2GB RAM and a 64-bit kernel.
1656 if sys.maxsize > 0x7FFFFFFF:
1657 rawio = self.MockRawIO()
1658 bufio = self.tp(rawio)
1659 self.assertRaises((OverflowError, MemoryError, ValueError),
1660 bufio.__init__, rawio, sys.maxsize)
1661
1662 def test_garbage_collection(self):
1663 CBufferedReaderTest.test_garbage_collection(self)
1664 CBufferedWriterTest.test_garbage_collection(self)
1665
1666class PyBufferedRandomTest(BufferedRandomTest):
1667 tp = pyio.BufferedRandom
1668
1669
Christian Heimes1a6387e2008-03-26 12:49:49 +00001670# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1671# properties:
1672# - A single output character can correspond to many bytes of input.
1673# - The number of input bytes to complete the character can be
1674# undetermined until the last input byte is received.
1675# - The number of input bytes can vary depending on previous input.
1676# - A single input byte can correspond to many characters of output.
1677# - The number of output characters can be undetermined until the
1678# last input byte is received.
1679# - The number of output characters can vary depending on previous input.
1680
1681class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1682 """
1683 For testing seek/tell behavior with a stateful, buffering decoder.
1684
1685 Input is a sequence of words. Words may be fixed-length (length set
1686 by input) or variable-length (period-terminated). In variable-length
1687 mode, extra periods are ignored. Possible words are:
1688 - 'i' followed by a number sets the input length, I (maximum 99).
1689 When I is set to 0, words are space-terminated.
1690 - 'o' followed by a number sets the output length, O (maximum 99).
1691 - Any other word is converted into a word followed by a period on
1692 the output. The output word consists of the input word truncated
1693 or padded out with hyphens to make its length equal to O. If O
1694 is 0, the word is output verbatim without truncating or padding.
1695 I and O are initially set to 1. When I changes, any buffered input is
1696 re-scanned according to the new I. EOF also terminates the last word.
1697 """
1698
1699 def __init__(self, errors='strict'):
1700 codecs.IncrementalDecoder.__init__(self, errors)
1701 self.reset()
1702
1703 def __repr__(self):
1704 return '<SID %x>' % id(self)
1705
1706 def reset(self):
1707 self.i = 1
1708 self.o = 1
1709 self.buffer = bytearray()
1710
1711 def getstate(self):
1712 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1713 return bytes(self.buffer), i*100 + o
1714
1715 def setstate(self, state):
1716 buffer, io = state
1717 self.buffer = bytearray(buffer)
1718 i, o = divmod(io, 100)
1719 self.i, self.o = i ^ 1, o ^ 1
1720
1721 def decode(self, input, final=False):
1722 output = ''
1723 for b in input:
1724 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001725 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001726 if self.buffer:
1727 output += self.process_word()
1728 else:
1729 self.buffer.append(b)
1730 else: # fixed-length, terminate after self.i bytes
1731 self.buffer.append(b)
1732 if len(self.buffer) == self.i:
1733 output += self.process_word()
1734 if final and self.buffer: # EOF terminates the last word
1735 output += self.process_word()
1736 return output
1737
1738 def process_word(self):
1739 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001740 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001741 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001742 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001743 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1744 else:
1745 output = self.buffer.decode('ascii')
1746 if len(output) < self.o:
1747 output += '-'*self.o # pad out with hyphens
1748 if self.o:
1749 output = output[:self.o] # truncate to output length
1750 output += '.'
1751 self.buffer = bytearray()
1752 return output
1753
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001754 codecEnabled = False
1755
1756 @classmethod
1757 def lookupTestDecoder(cls, name):
1758 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001759 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001760 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001761 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001762 incrementalencoder=None,
1763 streamreader=None, streamwriter=None,
1764 incrementaldecoder=cls)
1765
1766# Register the previous decoder for testing.
1767# Disabled by default, tests will enable it.
1768codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1769
1770
Christian Heimes1a6387e2008-03-26 12:49:49 +00001771class StatefulIncrementalDecoderTest(unittest.TestCase):
1772 """
1773 Make sure the StatefulIncrementalDecoder actually works.
1774 """
1775
1776 test_cases = [
1777 # I=1, O=1 (fixed-length input == fixed-length output)
1778 (b'abcd', False, 'a.b.c.d.'),
1779 # I=0, O=0 (variable-length input, variable-length output)
1780 (b'oiabcd', True, 'abcd.'),
1781 # I=0, O=0 (should ignore extra periods)
1782 (b'oi...abcd...', True, 'abcd.'),
1783 # I=0, O=6 (variable-length input, fixed-length output)
1784 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1785 # I=2, O=6 (fixed-length input < fixed-length output)
1786 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1787 # I=6, O=3 (fixed-length input > fixed-length output)
1788 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1789 # I=0, then 3; O=29, then 15 (with longer output)
1790 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1791 'a----------------------------.' +
1792 'b----------------------------.' +
1793 'cde--------------------------.' +
1794 'abcdefghijabcde.' +
1795 'a.b------------.' +
1796 '.c.------------.' +
1797 'd.e------------.' +
1798 'k--------------.' +
1799 'l--------------.' +
1800 'm--------------.')
1801 ]
1802
Antoine Pitrou19690592009-06-12 20:14:08 +00001803 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001804 # Try a few one-shot test cases.
1805 for input, eof, output in self.test_cases:
1806 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001807 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001808
1809 # Also test an unfinished decode, followed by forcing EOF.
1810 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001811 self.assertEqual(d.decode(b'oiabcd'), '')
1812 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001813
1814class TextIOWrapperTest(unittest.TestCase):
1815
1816 def setUp(self):
1817 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1818 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001819 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001820
1821 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001822 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001823
Antoine Pitrou19690592009-06-12 20:14:08 +00001824 def test_constructor(self):
1825 r = self.BytesIO(b"\xc3\xa9\n\n")
1826 b = self.BufferedReader(r, 1000)
1827 t = self.TextIOWrapper(b)
1828 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001829 self.assertEqual(t.encoding, "latin1")
1830 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001831 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001832 self.assertEqual(t.encoding, "utf8")
1833 self.assertEqual(t.line_buffering, True)
1834 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001835 self.assertRaises(TypeError, t.__init__, b, newline=42)
1836 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1837
1838 def test_detach(self):
1839 r = self.BytesIO()
1840 b = self.BufferedWriter(r)
1841 t = self.TextIOWrapper(b)
1842 self.assertIs(t.detach(), b)
1843
1844 t = self.TextIOWrapper(b, encoding="ascii")
1845 t.write("howdy")
1846 self.assertFalse(r.getvalue())
1847 t.detach()
1848 self.assertEqual(r.getvalue(), b"howdy")
1849 self.assertRaises(ValueError, t.detach)
1850
1851 def test_repr(self):
1852 raw = self.BytesIO("hello".encode("utf-8"))
1853 b = self.BufferedReader(raw)
1854 t = self.TextIOWrapper(b, encoding="utf-8")
1855 modname = self.TextIOWrapper.__module__
1856 self.assertEqual(repr(t),
1857 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1858 raw.name = "dummy"
1859 self.assertEqual(repr(t),
1860 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1861 raw.name = b"dummy"
1862 self.assertEqual(repr(t),
1863 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1864
1865 def test_line_buffering(self):
1866 r = self.BytesIO()
1867 b = self.BufferedWriter(r, 1000)
1868 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1869 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00001870 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001871 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00001872 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001873 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00001874 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001875
Antoine Pitrou19690592009-06-12 20:14:08 +00001876 def test_encoding(self):
1877 # Check the encoding attribute is always set, and valid
1878 b = self.BytesIO()
1879 t = self.TextIOWrapper(b, encoding="utf8")
1880 self.assertEqual(t.encoding, "utf8")
1881 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001882 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001883 codecs.lookup(t.encoding)
1884
1885 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001886 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001887 b = self.BytesIO(b"abc\n\xff\n")
1888 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001889 self.assertRaises(UnicodeError, t.read)
1890 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001891 b = self.BytesIO(b"abc\n\xff\n")
1892 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001893 self.assertRaises(UnicodeError, t.read)
1894 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001895 b = self.BytesIO(b"abc\n\xff\n")
1896 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00001897 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001898 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001899 b = self.BytesIO(b"abc\n\xff\n")
1900 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00001901 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001902
Antoine Pitrou19690592009-06-12 20:14:08 +00001903 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001904 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001905 b = self.BytesIO()
1906 t = self.TextIOWrapper(b, encoding="ascii")
1907 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001908 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001909 b = self.BytesIO()
1910 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1911 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001912 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001913 b = self.BytesIO()
1914 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001915 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001916 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001917 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001918 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001919 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001920 b = self.BytesIO()
1921 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001922 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001923 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001924 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001925 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001926
Antoine Pitrou19690592009-06-12 20:14:08 +00001927 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001928 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1929
1930 tests = [
1931 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1932 [ '', input_lines ],
1933 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1934 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1935 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1936 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001937 encodings = (
1938 'utf-8', 'latin-1',
1939 'utf-16', 'utf-16-le', 'utf-16-be',
1940 'utf-32', 'utf-32-le', 'utf-32-be',
1941 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001942
1943 # Try a range of buffer sizes to test the case where \r is the last
1944 # character in TextIOWrapper._pending_line.
1945 for encoding in encodings:
1946 # XXX: str.encode() should return bytes
1947 data = bytes(''.join(input_lines).encode(encoding))
1948 for do_reads in (False, True):
1949 for bufsize in range(1, 10):
1950 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001951 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1952 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001953 encoding=encoding)
1954 if do_reads:
1955 got_lines = []
1956 while True:
1957 c2 = textio.read(2)
1958 if c2 == '':
1959 break
Ezio Melotti2623a372010-11-21 13:34:58 +00001960 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001961 got_lines.append(c2 + textio.readline())
1962 else:
1963 got_lines = list(textio)
1964
1965 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00001966 self.assertEqual(got_line, exp_line)
1967 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001968
Antoine Pitrou19690592009-06-12 20:14:08 +00001969 def test_newlines_input(self):
1970 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001971 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1972 for newline, expected in [
1973 (None, normalized.decode("ascii").splitlines(True)),
1974 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001975 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1976 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1977 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001978 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001979 buf = self.BytesIO(testdata)
1980 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00001981 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001982 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001983 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001984
Antoine Pitrou19690592009-06-12 20:14:08 +00001985 def test_newlines_output(self):
1986 testdict = {
1987 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1988 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1989 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1990 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1991 }
1992 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1993 for newline, expected in tests:
1994 buf = self.BytesIO()
1995 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1996 txt.write("AAA\nB")
1997 txt.write("BB\nCCC\n")
1998 txt.write("X\rY\r\nZ")
1999 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002000 self.assertEqual(buf.closed, False)
2001 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00002002
2003 def test_destructor(self):
2004 l = []
2005 base = self.BytesIO
2006 class MyBytesIO(base):
2007 def close(self):
2008 l.append(self.getvalue())
2009 base.close(self)
2010 b = MyBytesIO()
2011 t = self.TextIOWrapper(b, encoding="ascii")
2012 t.write("abc")
2013 del t
2014 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00002015 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00002016
2017 def test_override_destructor(self):
2018 record = []
2019 class MyTextIO(self.TextIOWrapper):
2020 def __del__(self):
2021 record.append(1)
2022 try:
2023 f = super(MyTextIO, self).__del__
2024 except AttributeError:
2025 pass
2026 else:
2027 f()
2028 def close(self):
2029 record.append(2)
2030 super(MyTextIO, self).close()
2031 def flush(self):
2032 record.append(3)
2033 super(MyTextIO, self).flush()
2034 b = self.BytesIO()
2035 t = MyTextIO(b, encoding="ascii")
2036 del t
2037 support.gc_collect()
2038 self.assertEqual(record, [1, 2, 3])
2039
2040 def test_error_through_destructor(self):
2041 # Test that the exception state is not modified by a destructor,
2042 # even if close() fails.
2043 rawio = self.CloseFailureIO()
2044 def f():
2045 self.TextIOWrapper(rawio).xyzzy
2046 with support.captured_output("stderr") as s:
2047 self.assertRaises(AttributeError, f)
2048 s = s.getvalue().strip()
2049 if s:
2050 # The destructor *may* have printed an unraisable error, check it
2051 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002052 self.assertTrue(s.startswith("Exception IOError: "), s)
2053 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002054
2055 # Systematic tests of the text I/O API
2056
Antoine Pitrou19690592009-06-12 20:14:08 +00002057 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002058 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2059 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00002060 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002061 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002062 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002063 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002064 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002065 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002066 self.assertEqual(f.tell(), 0)
2067 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002068 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002069 self.assertEqual(f.seek(0), 0)
2070 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002071 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002072 self.assertEqual(f.read(2), "ab")
2073 self.assertEqual(f.read(1), "c")
2074 self.assertEqual(f.read(1), "")
2075 self.assertEqual(f.read(), "")
2076 self.assertEqual(f.tell(), cookie)
2077 self.assertEqual(f.seek(0), 0)
2078 self.assertEqual(f.seek(0, 2), cookie)
2079 self.assertEqual(f.write("def"), 3)
2080 self.assertEqual(f.seek(cookie), cookie)
2081 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002082 if enc.startswith("utf"):
2083 self.multi_line_test(f, enc)
2084 f.close()
2085
2086 def multi_line_test(self, f, enc):
2087 f.seek(0)
2088 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002089 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002090 wlines = []
2091 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2092 chars = []
2093 for i in range(size):
2094 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002095 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002096 wlines.append((f.tell(), line))
2097 f.write(line)
2098 f.seek(0)
2099 rlines = []
2100 while True:
2101 pos = f.tell()
2102 line = f.readline()
2103 if not line:
2104 break
2105 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002106 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002107
Antoine Pitrou19690592009-06-12 20:14:08 +00002108 def test_telling(self):
2109 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002110 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002111 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002112 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002113 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002114 p2 = f.tell()
2115 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002116 self.assertEqual(f.tell(), p0)
2117 self.assertEqual(f.readline(), "\xff\n")
2118 self.assertEqual(f.tell(), p1)
2119 self.assertEqual(f.readline(), "\xff\n")
2120 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002121 f.seek(0)
2122 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002123 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002124 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002125 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002126 f.close()
2127
Antoine Pitrou19690592009-06-12 20:14:08 +00002128 def test_seeking(self):
2129 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002130 prefix_size = chunk_size - 2
2131 u_prefix = "a" * prefix_size
2132 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002133 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002134 u_suffix = "\u8888\n"
2135 suffix = bytes(u_suffix.encode("utf-8"))
2136 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002137 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002138 f.write(line*2)
2139 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002140 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002141 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002142 self.assertEqual(s, prefix.decode("ascii"))
2143 self.assertEqual(f.tell(), prefix_size)
2144 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002145
Antoine Pitrou19690592009-06-12 20:14:08 +00002146 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002147 # Regression test for a specific bug
2148 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002149 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002150 f.write(data)
2151 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002152 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002153 f._CHUNK_SIZE # Just test that it exists
2154 f._CHUNK_SIZE = 2
2155 f.readline()
2156 f.tell()
2157
Antoine Pitrou19690592009-06-12 20:14:08 +00002158 def test_seek_and_tell(self):
2159 #Test seek/tell using the StatefulIncrementalDecoder.
2160 # Make test faster by doing smaller seeks
2161 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002162
Antoine Pitrou19690592009-06-12 20:14:08 +00002163 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002164 """Tell/seek to various points within a data stream and ensure
2165 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002166 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002167 f.write(data)
2168 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002169 f = self.open(support.TESTFN, encoding='test_decoder')
2170 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002171 decoded = f.read()
2172 f.close()
2173
2174 for i in range(min_pos, len(decoded) + 1): # seek positions
2175 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002176 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002177 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002178 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002179 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002180 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002181 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002182 f.close()
2183
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002184 # Enable the test decoder.
2185 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002186
2187 # Run the tests.
2188 try:
2189 # Try each test case.
2190 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002191 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002192
2193 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002194 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2195 offset = CHUNK_SIZE - len(input)//2
2196 prefix = b'.'*offset
2197 # Don't bother seeking into the prefix (takes too long).
2198 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002199 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002200
2201 # Ensure our test decoder won't interfere with subsequent tests.
2202 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002203 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002204
Antoine Pitrou19690592009-06-12 20:14:08 +00002205 def test_encoded_writes(self):
2206 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002207 tests = ("utf-16",
2208 "utf-16-le",
2209 "utf-16-be",
2210 "utf-32",
2211 "utf-32-le",
2212 "utf-32-be")
2213 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002214 buf = self.BytesIO()
2215 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002216 # Check if the BOM is written only once (see issue1753).
2217 f.write(data)
2218 f.write(data)
2219 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002220 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002221 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002222 self.assertEqual(f.read(), data * 2)
2223 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002224
Antoine Pitrou19690592009-06-12 20:14:08 +00002225 def test_unreadable(self):
2226 class UnReadable(self.BytesIO):
2227 def readable(self):
2228 return False
2229 txt = self.TextIOWrapper(UnReadable())
2230 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002231
Antoine Pitrou19690592009-06-12 20:14:08 +00002232 def test_read_one_by_one(self):
2233 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002234 reads = ""
2235 while True:
2236 c = txt.read(1)
2237 if not c:
2238 break
2239 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002240 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002241
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002242 def test_readlines(self):
2243 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2244 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2245 txt.seek(0)
2246 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2247 txt.seek(0)
2248 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2249
Christian Heimes1a6387e2008-03-26 12:49:49 +00002250 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002251 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002252 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002253 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002254 reads = ""
2255 while True:
2256 c = txt.read(128)
2257 if not c:
2258 break
2259 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002260 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002261
Antoine Pitroueadca1d2012-10-16 23:02:27 +02002262 def test_writelines(self):
2263 l = ['ab', 'cd', 'ef']
2264 buf = self.BytesIO()
2265 txt = self.TextIOWrapper(buf)
2266 txt.writelines(l)
2267 txt.flush()
2268 self.assertEqual(buf.getvalue(), b'abcdef')
2269
2270 def test_writelines_userlist(self):
2271 l = UserList(['ab', 'cd', 'ef'])
2272 buf = self.BytesIO()
2273 txt = self.TextIOWrapper(buf)
2274 txt.writelines(l)
2275 txt.flush()
2276 self.assertEqual(buf.getvalue(), b'abcdef')
2277
2278 def test_writelines_error(self):
2279 txt = self.TextIOWrapper(self.BytesIO())
2280 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2281 self.assertRaises(TypeError, txt.writelines, None)
2282 self.assertRaises(TypeError, txt.writelines, b'abc')
2283
Christian Heimes1a6387e2008-03-26 12:49:49 +00002284 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002285 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002286
2287 # read one char at a time
2288 reads = ""
2289 while True:
2290 c = txt.read(1)
2291 if not c:
2292 break
2293 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002294 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002295
2296 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002297 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002298 txt._CHUNK_SIZE = 4
2299
2300 reads = ""
2301 while True:
2302 c = txt.read(4)
2303 if not c:
2304 break
2305 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002306 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002307
2308 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002309 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002310 txt._CHUNK_SIZE = 4
2311
2312 reads = txt.read(4)
2313 reads += txt.read(4)
2314 reads += txt.readline()
2315 reads += txt.readline()
2316 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002317 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002318
2319 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002320 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002321 txt._CHUNK_SIZE = 4
2322
2323 reads = txt.read(4)
2324 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002325 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002326
2327 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002328 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002329 txt._CHUNK_SIZE = 4
2330
2331 reads = txt.read(4)
2332 pos = txt.tell()
2333 txt.seek(0)
2334 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002335 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002336
2337 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002338 buffer = self.BytesIO(self.testdata)
2339 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002340
2341 self.assertEqual(buffer.seekable(), txt.seekable())
2342
Antoine Pitrou19690592009-06-12 20:14:08 +00002343 def test_append_bom(self):
2344 # The BOM is not written again when appending to a non-empty file
2345 filename = support.TESTFN
2346 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2347 with self.open(filename, 'w', encoding=charset) as f:
2348 f.write('aaa')
2349 pos = f.tell()
2350 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002351 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002352
2353 with self.open(filename, 'a', encoding=charset) as f:
2354 f.write('xxx')
2355 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002356 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002357
Antoine Pitrou19690592009-06-12 20:14:08 +00002358 def test_seek_bom(self):
2359 # Same test, but when seeking manually
2360 filename = support.TESTFN
2361 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2362 with self.open(filename, 'w', encoding=charset) as f:
2363 f.write('aaa')
2364 pos = f.tell()
2365 with self.open(filename, 'r+', encoding=charset) as f:
2366 f.seek(pos)
2367 f.write('zzz')
2368 f.seek(0)
2369 f.write('bbb')
2370 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002371 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002372
2373 def test_errors_property(self):
2374 with self.open(support.TESTFN, "w") as f:
2375 self.assertEqual(f.errors, "strict")
2376 with self.open(support.TESTFN, "w", errors="replace") as f:
2377 self.assertEqual(f.errors, "replace")
2378
Victor Stinner6a102812010-04-27 23:55:59 +00002379 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002380 def test_threads_write(self):
2381 # Issue6750: concurrent writes could duplicate data
2382 event = threading.Event()
2383 with self.open(support.TESTFN, "w", buffering=1) as f:
2384 def run(n):
2385 text = "Thread%03d\n" % n
2386 event.wait()
2387 f.write(text)
2388 threads = [threading.Thread(target=lambda n=x: run(n))
2389 for x in range(20)]
2390 for t in threads:
2391 t.start()
2392 time.sleep(0.02)
2393 event.set()
2394 for t in threads:
2395 t.join()
2396 with self.open(support.TESTFN) as f:
2397 content = f.read()
2398 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002399 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002400
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002401 def test_flush_error_on_close(self):
2402 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2403 def bad_flush():
2404 raise IOError()
2405 txt.flush = bad_flush
2406 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -06002407 self.assertTrue(txt.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002408
2409 def test_multi_close(self):
2410 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2411 txt.close()
2412 txt.close()
2413 txt.close()
2414 self.assertRaises(ValueError, txt.flush)
2415
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002416 def test_readonly_attributes(self):
2417 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2418 buf = self.BytesIO(self.testdata)
2419 with self.assertRaises((AttributeError, TypeError)):
2420 txt.buffer = buf
2421
Antoine Pitrou19690592009-06-12 20:14:08 +00002422class CTextIOWrapperTest(TextIOWrapperTest):
2423
2424 def test_initialization(self):
2425 r = self.BytesIO(b"\xc3\xa9\n\n")
2426 b = self.BufferedReader(r, 1000)
2427 t = self.TextIOWrapper(b)
2428 self.assertRaises(TypeError, t.__init__, b, newline=42)
2429 self.assertRaises(ValueError, t.read)
2430 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2431 self.assertRaises(ValueError, t.read)
2432
2433 def test_garbage_collection(self):
2434 # C TextIOWrapper objects are collected, and collecting them flushes
2435 # all data to disk.
2436 # The Python version has __del__, so it ends in gc.garbage instead.
2437 rawio = io.FileIO(support.TESTFN, "wb")
2438 b = self.BufferedWriter(rawio)
2439 t = self.TextIOWrapper(b, encoding="ascii")
2440 t.write("456def")
2441 t.x = t
2442 wr = weakref.ref(t)
2443 del t
2444 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002445 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002446 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002447 self.assertEqual(f.read(), b"456def")
2448
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002449 def test_rwpair_cleared_before_textio(self):
2450 # Issue 13070: TextIOWrapper's finalization would crash when called
2451 # after the reference to the underlying BufferedRWPair's writer got
2452 # cleared by the GC.
2453 for i in range(1000):
2454 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2455 t1 = self.TextIOWrapper(b1, encoding="ascii")
2456 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2457 t2 = self.TextIOWrapper(b2, encoding="ascii")
2458 # circular references
2459 t1.buddy = t2
2460 t2.buddy = t1
2461 support.gc_collect()
2462
2463
Antoine Pitrou19690592009-06-12 20:14:08 +00002464class PyTextIOWrapperTest(TextIOWrapperTest):
2465 pass
2466
2467
2468class IncrementalNewlineDecoderTest(unittest.TestCase):
2469
2470 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002471 # UTF-8 specific tests for a newline decoder
2472 def _check_decode(b, s, **kwargs):
2473 # We exercise getstate() / setstate() as well as decode()
2474 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002475 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002476 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002477 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002478
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002479 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002480
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002481 _check_decode(b'\xe8', "")
2482 _check_decode(b'\xa2', "")
2483 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002484
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002485 _check_decode(b'\xe8', "")
2486 _check_decode(b'\xa2', "")
2487 _check_decode(b'\x88', "\u8888")
2488
2489 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002490 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2491
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002492 decoder.reset()
2493 _check_decode(b'\n', "\n")
2494 _check_decode(b'\r', "")
2495 _check_decode(b'', "\n", final=True)
2496 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002497
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002498 _check_decode(b'\r', "")
2499 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002500
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002501 _check_decode(b'\r\r\n', "\n\n")
2502 _check_decode(b'\r', "")
2503 _check_decode(b'\r', "\n")
2504 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002505
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002506 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2507 _check_decode(b'\xe8\xa2\x88', "\u8888")
2508 _check_decode(b'\n', "\n")
2509 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2510 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002511
Antoine Pitrou19690592009-06-12 20:14:08 +00002512 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002513 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002514 if encoding is not None:
2515 encoder = codecs.getincrementalencoder(encoding)()
2516 def _decode_bytewise(s):
2517 # Decode one byte at a time
2518 for b in encoder.encode(s):
2519 result.append(decoder.decode(b))
2520 else:
2521 encoder = None
2522 def _decode_bytewise(s):
2523 # Decode one char at a time
2524 for c in s:
2525 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002526 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002527 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002528 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002529 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002530 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002531 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002532 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002533 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002534 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002535 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002536 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002537 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002538 input = "abc"
2539 if encoder is not None:
2540 encoder.reset()
2541 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002542 self.assertEqual(decoder.decode(input), "abc")
2543 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002544
2545 def test_newline_decoder(self):
2546 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002547 # None meaning the IncrementalNewlineDecoder takes unicode input
2548 # rather than bytes input
2549 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002550 'utf-16', 'utf-16-le', 'utf-16-be',
2551 'utf-32', 'utf-32-le', 'utf-32-be',
2552 )
2553 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002554 decoder = enc and codecs.getincrementaldecoder(enc)()
2555 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2556 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002557 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002558 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2559 self.check_newline_decoding_utf8(decoder)
2560
2561 def test_newline_bytes(self):
2562 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2563 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002564 self.assertEqual(dec.newlines, None)
2565 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2566 self.assertEqual(dec.newlines, None)
2567 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2568 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002569 dec = self.IncrementalNewlineDecoder(None, translate=False)
2570 _check(dec)
2571 dec = self.IncrementalNewlineDecoder(None, translate=True)
2572 _check(dec)
2573
2574class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2575 pass
2576
2577class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2578 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002579
Christian Heimes1a6387e2008-03-26 12:49:49 +00002580
2581# XXX Tests for open()
2582
2583class MiscIOTest(unittest.TestCase):
2584
Benjamin Petersonad100c32008-11-20 22:06:22 +00002585 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002586 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002587
Antoine Pitrou19690592009-06-12 20:14:08 +00002588 def test___all__(self):
2589 for name in self.io.__all__:
2590 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002591 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002592 if name == "open":
2593 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002594 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002595 self.assertTrue(issubclass(obj, Exception), name)
2596 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002597 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002598
Benjamin Petersonad100c32008-11-20 22:06:22 +00002599 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002600 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002601 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002602 f.close()
2603
Antoine Pitrou19690592009-06-12 20:14:08 +00002604 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002605 self.assertEqual(f.name, support.TESTFN)
2606 self.assertEqual(f.buffer.name, support.TESTFN)
2607 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2608 self.assertEqual(f.mode, "U")
2609 self.assertEqual(f.buffer.mode, "rb")
2610 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002611 f.close()
2612
Antoine Pitrou19690592009-06-12 20:14:08 +00002613 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002614 self.assertEqual(f.mode, "w+")
2615 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2616 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002617
Antoine Pitrou19690592009-06-12 20:14:08 +00002618 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002619 self.assertEqual(g.mode, "wb")
2620 self.assertEqual(g.raw.mode, "wb")
2621 self.assertEqual(g.name, f.fileno())
2622 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002623 f.close()
2624 g.close()
2625
Antoine Pitrou19690592009-06-12 20:14:08 +00002626 def test_io_after_close(self):
2627 for kwargs in [
2628 {"mode": "w"},
2629 {"mode": "wb"},
2630 {"mode": "w", "buffering": 1},
2631 {"mode": "w", "buffering": 2},
2632 {"mode": "wb", "buffering": 0},
2633 {"mode": "r"},
2634 {"mode": "rb"},
2635 {"mode": "r", "buffering": 1},
2636 {"mode": "r", "buffering": 2},
2637 {"mode": "rb", "buffering": 0},
2638 {"mode": "w+"},
2639 {"mode": "w+b"},
2640 {"mode": "w+", "buffering": 1},
2641 {"mode": "w+", "buffering": 2},
2642 {"mode": "w+b", "buffering": 0},
2643 ]:
2644 f = self.open(support.TESTFN, **kwargs)
2645 f.close()
2646 self.assertRaises(ValueError, f.flush)
2647 self.assertRaises(ValueError, f.fileno)
2648 self.assertRaises(ValueError, f.isatty)
2649 self.assertRaises(ValueError, f.__iter__)
2650 if hasattr(f, "peek"):
2651 self.assertRaises(ValueError, f.peek, 1)
2652 self.assertRaises(ValueError, f.read)
2653 if hasattr(f, "read1"):
2654 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002655 if hasattr(f, "readall"):
2656 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002657 if hasattr(f, "readinto"):
2658 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2659 self.assertRaises(ValueError, f.readline)
2660 self.assertRaises(ValueError, f.readlines)
2661 self.assertRaises(ValueError, f.seek, 0)
2662 self.assertRaises(ValueError, f.tell)
2663 self.assertRaises(ValueError, f.truncate)
2664 self.assertRaises(ValueError, f.write,
2665 b"" if "b" in kwargs['mode'] else "")
2666 self.assertRaises(ValueError, f.writelines, [])
2667 self.assertRaises(ValueError, next, f)
2668
2669 def test_blockingioerror(self):
2670 # Various BlockingIOError issues
2671 self.assertRaises(TypeError, self.BlockingIOError)
2672 self.assertRaises(TypeError, self.BlockingIOError, 1)
2673 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2674 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2675 b = self.BlockingIOError(1, "")
2676 self.assertEqual(b.characters_written, 0)
2677 class C(unicode):
2678 pass
2679 c = C("")
2680 b = self.BlockingIOError(1, c)
2681 c.b = b
2682 b.c = c
2683 wr = weakref.ref(c)
2684 del c, b
2685 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002686 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002687
2688 def test_abcs(self):
2689 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002690 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2691 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2692 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2693 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002694
2695 def _check_abc_inheritance(self, abcmodule):
2696 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002697 self.assertIsInstance(f, abcmodule.IOBase)
2698 self.assertIsInstance(f, abcmodule.RawIOBase)
2699 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2700 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002701 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002702 self.assertIsInstance(f, abcmodule.IOBase)
2703 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2704 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2705 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002706 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002707 self.assertIsInstance(f, abcmodule.IOBase)
2708 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2709 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2710 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002711
2712 def test_abc_inheritance(self):
2713 # Test implementations inherit from their respective ABCs
2714 self._check_abc_inheritance(self)
2715
2716 def test_abc_inheritance_official(self):
2717 # Test implementations inherit from the official ABCs of the
2718 # baseline "io" module.
2719 self._check_abc_inheritance(io)
2720
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01002721 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2722 def test_nonblock_pipe_write_bigbuf(self):
2723 self._test_nonblock_pipe_write(16*1024)
2724
2725 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2726 def test_nonblock_pipe_write_smallbuf(self):
2727 self._test_nonblock_pipe_write(1024)
2728
2729 def _set_non_blocking(self, fd):
2730 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2731 self.assertNotEqual(flags, -1)
2732 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2733 self.assertEqual(res, 0)
2734
2735 def _test_nonblock_pipe_write(self, bufsize):
2736 sent = []
2737 received = []
2738 r, w = os.pipe()
2739 self._set_non_blocking(r)
2740 self._set_non_blocking(w)
2741
2742 # To exercise all code paths in the C implementation we need
2743 # to play with buffer sizes. For instance, if we choose a
2744 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2745 # then we will never get a partial write of the buffer.
2746 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2747 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2748
2749 with rf, wf:
2750 for N in 9999, 73, 7574:
2751 try:
2752 i = 0
2753 while True:
2754 msg = bytes([i % 26 + 97]) * N
2755 sent.append(msg)
2756 wf.write(msg)
2757 i += 1
2758
2759 except self.BlockingIOError as e:
2760 self.assertEqual(e.args[0], errno.EAGAIN)
2761 sent[-1] = sent[-1][:e.characters_written]
2762 received.append(rf.read())
2763 msg = b'BLOCKED'
2764 wf.write(msg)
2765 sent.append(msg)
2766
2767 while True:
2768 try:
2769 wf.flush()
2770 break
2771 except self.BlockingIOError as e:
2772 self.assertEqual(e.args[0], errno.EAGAIN)
2773 self.assertEqual(e.characters_written, 0)
2774 received.append(rf.read())
2775
2776 received += iter(rf.read, None)
2777
2778 sent, received = b''.join(sent), b''.join(received)
2779 self.assertTrue(sent == received)
2780 self.assertTrue(wf.closed)
2781 self.assertTrue(rf.closed)
2782
Antoine Pitrou19690592009-06-12 20:14:08 +00002783class CMiscIOTest(MiscIOTest):
2784 io = io
2785
2786class PyMiscIOTest(MiscIOTest):
2787 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002788
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002789
2790@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2791class SignalsTest(unittest.TestCase):
2792
2793 def setUp(self):
2794 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2795
2796 def tearDown(self):
2797 signal.signal(signal.SIGALRM, self.oldalrm)
2798
2799 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00002800 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002801
2802 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02002803 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2804 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002805 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2806 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00002807 invokes the signal handler, and bubbles up the exception raised
2808 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002809 read_results = []
2810 def _read():
2811 s = os.read(r, 1)
2812 read_results.append(s)
2813 t = threading.Thread(target=_read)
2814 t.daemon = True
2815 r, w = os.pipe()
2816 try:
2817 wio = self.io.open(w, **fdopen_kwargs)
2818 t.start()
2819 signal.alarm(1)
2820 # Fill the pipe enough that the write will be blocking.
2821 # It will be interrupted by the timer armed above. Since the
2822 # other thread has read one byte, the low-level write will
2823 # return with a successful (partial) result rather than an EINTR.
2824 # The buffered IO layer must check for pending signal
2825 # handlers, which in this case will invoke alarm_interrupt().
2826 self.assertRaises(ZeroDivisionError,
2827 wio.write, item * (1024 * 1024))
2828 t.join()
2829 # We got one byte, get another one and check that it isn't a
2830 # repeat of the first one.
2831 read_results.append(os.read(r, 1))
2832 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2833 finally:
2834 os.close(w)
2835 os.close(r)
2836 # This is deliberate. If we didn't close the file descriptor
2837 # before closing wio, wio would try to flush its internal
2838 # buffer, and block again.
2839 try:
2840 wio.close()
2841 except IOError as e:
2842 if e.errno != errno.EBADF:
2843 raise
2844
2845 def test_interrupted_write_unbuffered(self):
2846 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2847
2848 def test_interrupted_write_buffered(self):
2849 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2850
2851 def test_interrupted_write_text(self):
2852 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2853
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002854 def check_reentrant_write(self, data, **fdopen_kwargs):
2855 def on_alarm(*args):
2856 # Will be called reentrantly from the same thread
2857 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02002858 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002859 signal.signal(signal.SIGALRM, on_alarm)
2860 r, w = os.pipe()
2861 wio = self.io.open(w, **fdopen_kwargs)
2862 try:
2863 signal.alarm(1)
2864 # Either the reentrant call to wio.write() fails with RuntimeError,
2865 # or the signal handler raises ZeroDivisionError.
2866 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2867 while 1:
2868 for i in range(100):
2869 wio.write(data)
2870 wio.flush()
2871 # Make sure the buffer doesn't fill up and block further writes
2872 os.read(r, len(data) * 100)
2873 exc = cm.exception
2874 if isinstance(exc, RuntimeError):
2875 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2876 finally:
2877 wio.close()
2878 os.close(r)
2879
2880 def test_reentrant_write_buffered(self):
2881 self.check_reentrant_write(b"xy", mode="wb")
2882
2883 def test_reentrant_write_text(self):
2884 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2885
Antoine Pitrou6439c002011-02-25 21:35:47 +00002886 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2887 """Check that a buffered read, when it gets interrupted (either
2888 returning a partial result or EINTR), properly invokes the signal
2889 handler and retries if the latter returned successfully."""
2890 r, w = os.pipe()
2891 fdopen_kwargs["closefd"] = False
2892 def alarm_handler(sig, frame):
2893 os.write(w, b"bar")
2894 signal.signal(signal.SIGALRM, alarm_handler)
2895 try:
2896 rio = self.io.open(r, **fdopen_kwargs)
2897 os.write(w, b"foo")
2898 signal.alarm(1)
2899 # Expected behaviour:
2900 # - first raw read() returns partial b"foo"
2901 # - second raw read() returns EINTR
2902 # - third raw read() returns b"bar"
2903 self.assertEqual(decode(rio.read(6)), "foobar")
2904 finally:
2905 rio.close()
2906 os.close(w)
2907 os.close(r)
2908
2909 def test_interrupterd_read_retry_buffered(self):
2910 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2911 mode="rb")
2912
2913 def test_interrupterd_read_retry_text(self):
2914 self.check_interrupted_read_retry(lambda x: x,
2915 mode="r")
2916
2917 @unittest.skipUnless(threading, 'Threading required for this test.')
2918 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2919 """Check that a buffered write, when it gets interrupted (either
2920 returning a partial result or EINTR), properly invokes the signal
2921 handler and retries if the latter returned successfully."""
2922 select = support.import_module("select")
2923 # A quantity that exceeds the buffer size of an anonymous pipe's
2924 # write end.
2925 N = 1024 * 1024
2926 r, w = os.pipe()
2927 fdopen_kwargs["closefd"] = False
2928 # We need a separate thread to read from the pipe and allow the
2929 # write() to finish. This thread is started after the SIGALRM is
2930 # received (forcing a first EINTR in write()).
2931 read_results = []
2932 write_finished = False
2933 def _read():
2934 while not write_finished:
2935 while r in select.select([r], [], [], 1.0)[0]:
2936 s = os.read(r, 1024)
2937 read_results.append(s)
2938 t = threading.Thread(target=_read)
2939 t.daemon = True
2940 def alarm1(sig, frame):
2941 signal.signal(signal.SIGALRM, alarm2)
2942 signal.alarm(1)
2943 def alarm2(sig, frame):
2944 t.start()
2945 signal.signal(signal.SIGALRM, alarm1)
2946 try:
2947 wio = self.io.open(w, **fdopen_kwargs)
2948 signal.alarm(1)
2949 # Expected behaviour:
2950 # - first raw write() is partial (because of the limited pipe buffer
2951 # and the first alarm)
2952 # - second raw write() returns EINTR (because of the second alarm)
2953 # - subsequent write()s are successful (either partial or complete)
2954 self.assertEqual(N, wio.write(item * N))
2955 wio.flush()
2956 write_finished = True
2957 t.join()
2958 self.assertEqual(N, sum(len(x) for x in read_results))
2959 finally:
2960 write_finished = True
2961 os.close(w)
2962 os.close(r)
2963 # This is deliberate. If we didn't close the file descriptor
2964 # before closing wio, wio would try to flush its internal
2965 # buffer, and could block (in case of failure).
2966 try:
2967 wio.close()
2968 except IOError as e:
2969 if e.errno != errno.EBADF:
2970 raise
2971
2972 def test_interrupterd_write_retry_buffered(self):
2973 self.check_interrupted_write_retry(b"x", mode="wb")
2974
2975 def test_interrupterd_write_retry_text(self):
2976 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
2977
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002978
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002979class CSignalsTest(SignalsTest):
2980 io = io
2981
2982class PySignalsTest(SignalsTest):
2983 io = pyio
2984
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002985 # Handling reentrancy issues would slow down _pyio even more, so the
2986 # tests are disabled.
2987 test_reentrant_write_buffered = None
2988 test_reentrant_write_text = None
2989
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002990
Christian Heimes1a6387e2008-03-26 12:49:49 +00002991def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00002992 tests = (CIOTest, PyIOTest,
2993 CBufferedReaderTest, PyBufferedReaderTest,
2994 CBufferedWriterTest, PyBufferedWriterTest,
2995 CBufferedRWPairTest, PyBufferedRWPairTest,
2996 CBufferedRandomTest, PyBufferedRandomTest,
2997 StatefulIncrementalDecoderTest,
2998 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2999 CTextIOWrapperTest, PyTextIOWrapperTest,
3000 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003001 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00003002 )
3003
3004 # Put the namespaces of the IO module we are testing and some useful mock
3005 # classes in the __dict__ of each test.
3006 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00003007 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00003008 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3009 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3010 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3011 globs = globals()
3012 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3013 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3014 # Avoid turning open into a bound method.
3015 py_io_ns["open"] = pyio.OpenWrapper
3016 for test in tests:
3017 if test.__name__.startswith("C"):
3018 for name, obj in c_io_ns.items():
3019 setattr(test, name, obj)
3020 elif test.__name__.startswith("Py"):
3021 for name, obj in py_io_ns.items():
3022 setattr(test, name, obj)
3023
3024 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00003025
3026if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00003027 test_main()