blob: d10f52a5528b2fb779a4af32eb9fe329b03e8d60 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Antoine Pitrou19690592009-06-12 20:14:08 +000032import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000033import signal
34import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000035from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000036from collections import deque
Antoine Pitrou78e761e2012-10-16 22:57:11 +020037from UserList import UserList
Antoine Pitrou19690592009-06-12 20:14:08 +000038from test import test_support as support
Christian Heimes1a6387e2008-03-26 12:49:49 +000039
40import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000043try:
44 import threading
45except ImportError:
46 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010047try:
48 import fcntl
49except ImportError:
50 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000051
52__metaclass__ = type
53bytes = support.py3k_bytes
54
55def _default_chunk_size():
56 """Get the default TextIOWrapper chunk size"""
57 with io.open(__file__, "r", encoding="latin1") as f:
58 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000059
60
Antoine Pitrou6391b342010-09-14 18:48:19 +000061class MockRawIOWithoutRead:
62 """A RawIO implementation without read(), so as to exercise the default
63 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000064
65 def __init__(self, read_stack=()):
66 self._read_stack = list(read_stack)
67 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000068 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000069 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000070
Christian Heimes1a6387e2008-03-26 12:49:49 +000071 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000072 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000073 return len(b)
74
75 def writable(self):
76 return True
77
78 def fileno(self):
79 return 42
80
81 def readable(self):
82 return True
83
84 def seekable(self):
85 return True
86
87 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000088 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000089
90 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000091 return 0 # same comment as above
92
93 def readinto(self, buf):
94 self._reads += 1
95 max_len = len(buf)
96 try:
97 data = self._read_stack[0]
98 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000099 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +0000100 return 0
101 if data is None:
102 del self._read_stack[0]
103 return None
104 n = len(data)
105 if len(data) <= max_len:
106 del self._read_stack[0]
107 buf[:n] = data
108 return n
109 else:
110 buf[:] = data[:max_len]
111 self._read_stack[0] = data[max_len:]
112 return max_len
113
114 def truncate(self, pos=None):
115 return pos
116
Antoine Pitrou6391b342010-09-14 18:48:19 +0000117class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
118 pass
119
120class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
121 pass
122
123
124class MockRawIO(MockRawIOWithoutRead):
125
126 def read(self, n=None):
127 self._reads += 1
128 try:
129 return self._read_stack.pop(0)
130 except:
131 self._extraneous_reads += 1
132 return b""
133
Antoine Pitrou19690592009-06-12 20:14:08 +0000134class CMockRawIO(MockRawIO, io.RawIOBase):
135 pass
136
137class PyMockRawIO(MockRawIO, pyio.RawIOBase):
138 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000139
140
Antoine Pitrou19690592009-06-12 20:14:08 +0000141class MisbehavedRawIO(MockRawIO):
142 def write(self, b):
143 return MockRawIO.write(self, b) * 2
144
145 def read(self, n=None):
146 return MockRawIO.read(self, n) * 2
147
148 def seek(self, pos, whence):
149 return -123
150
151 def tell(self):
152 return -456
153
154 def readinto(self, buf):
155 MockRawIO.readinto(self, buf)
156 return len(buf) * 5
157
158class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
159 pass
160
161class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
162 pass
163
164
165class CloseFailureIO(MockRawIO):
166 closed = 0
167
168 def close(self):
169 if not self.closed:
170 self.closed = 1
171 raise IOError
172
173class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
174 pass
175
176class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
177 pass
178
179
180class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000181
182 def __init__(self, data):
183 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000184 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000185
186 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000187 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000188 self.read_history.append(None if res is None else len(res))
189 return res
190
Antoine Pitrou19690592009-06-12 20:14:08 +0000191 def readinto(self, b):
192 res = super(MockFileIO, self).readinto(b)
193 self.read_history.append(res)
194 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000195
Antoine Pitrou19690592009-06-12 20:14:08 +0000196class CMockFileIO(MockFileIO, io.BytesIO):
197 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000198
Antoine Pitrou19690592009-06-12 20:14:08 +0000199class PyMockFileIO(MockFileIO, pyio.BytesIO):
200 pass
201
202
203class MockNonBlockWriterIO:
204
205 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000206 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000207 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000208
Antoine Pitrou19690592009-06-12 20:14:08 +0000209 def pop_written(self):
210 s = b"".join(self._write_stack)
211 self._write_stack[:] = []
212 return s
213
214 def block_on(self, char):
215 """Block when a given char is encountered."""
216 self._blocker_char = char
217
218 def readable(self):
219 return True
220
221 def seekable(self):
222 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000223
224 def writable(self):
225 return True
226
Antoine Pitrou19690592009-06-12 20:14:08 +0000227 def write(self, b):
228 b = bytes(b)
229 n = -1
230 if self._blocker_char:
231 try:
232 n = b.index(self._blocker_char)
233 except ValueError:
234 pass
235 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100236 if n > 0:
237 # write data up to the first blocker
238 self._write_stack.append(b[:n])
239 return n
240 else:
241 # cancel blocker and indicate would block
242 self._blocker_char = None
243 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000244 self._write_stack.append(b)
245 return len(b)
246
247class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
248 BlockingIOError = io.BlockingIOError
249
250class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
251 BlockingIOError = pyio.BlockingIOError
252
Christian Heimes1a6387e2008-03-26 12:49:49 +0000253
254class IOTest(unittest.TestCase):
255
Antoine Pitrou19690592009-06-12 20:14:08 +0000256 def setUp(self):
257 support.unlink(support.TESTFN)
258
Christian Heimes1a6387e2008-03-26 12:49:49 +0000259 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000260 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000261
262 def write_ops(self, f):
263 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000264 f.truncate(0)
265 self.assertEqual(f.tell(), 5)
266 f.seek(0)
267
268 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000269 self.assertEqual(f.seek(0), 0)
270 self.assertEqual(f.write(b"Hello."), 6)
271 self.assertEqual(f.tell(), 6)
272 self.assertEqual(f.seek(-1, 1), 5)
273 self.assertEqual(f.tell(), 5)
274 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
275 self.assertEqual(f.seek(0), 0)
276 self.assertEqual(f.write(b"h"), 1)
277 self.assertEqual(f.seek(-1, 2), 13)
278 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000279
Christian Heimes1a6387e2008-03-26 12:49:49 +0000280 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000281 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000282 self.assertRaises(TypeError, f.seek, 0.0)
283
284 def read_ops(self, f, buffered=False):
285 data = f.read(5)
286 self.assertEqual(data, b"hello")
287 data = bytearray(data)
288 self.assertEqual(f.readinto(data), 5)
289 self.assertEqual(data, b" worl")
290 self.assertEqual(f.readinto(data), 2)
291 self.assertEqual(len(data), 5)
292 self.assertEqual(data[:2], b"d\n")
293 self.assertEqual(f.seek(0), 0)
294 self.assertEqual(f.read(20), b"hello world\n")
295 self.assertEqual(f.read(1), b"")
296 self.assertEqual(f.readinto(bytearray(b"x")), 0)
297 self.assertEqual(f.seek(-6, 2), 6)
298 self.assertEqual(f.read(5), b"world")
299 self.assertEqual(f.read(0), b"")
300 self.assertEqual(f.readinto(bytearray()), 0)
301 self.assertEqual(f.seek(-6, 1), 5)
302 self.assertEqual(f.read(5), b" worl")
303 self.assertEqual(f.tell(), 10)
304 self.assertRaises(TypeError, f.seek, 0.0)
305 if buffered:
306 f.seek(0)
307 self.assertEqual(f.read(), b"hello world\n")
308 f.seek(6)
309 self.assertEqual(f.read(), b"world\n")
310 self.assertEqual(f.read(), b"")
311
312 LARGE = 2**31
313
314 def large_file_ops(self, f):
315 assert f.readable()
316 assert f.writable()
317 self.assertEqual(f.seek(self.LARGE), self.LARGE)
318 self.assertEqual(f.tell(), self.LARGE)
319 self.assertEqual(f.write(b"xxx"), 3)
320 self.assertEqual(f.tell(), self.LARGE + 3)
321 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
322 self.assertEqual(f.truncate(), self.LARGE + 2)
323 self.assertEqual(f.tell(), self.LARGE + 2)
324 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
325 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000326 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000327 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
328 self.assertEqual(f.seek(-1, 2), self.LARGE)
329 self.assertEqual(f.read(2), b"x")
330
Antoine Pitrou19690592009-06-12 20:14:08 +0000331 def test_invalid_operations(self):
332 # Try writing on a file opened in read mode and vice-versa.
333 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000334 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000335 self.assertRaises(IOError, fp.read)
336 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000337 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000338 self.assertRaises(IOError, fp.write, b"blah")
339 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000340 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000341 self.assertRaises(IOError, fp.write, "blah")
342 self.assertRaises(IOError, fp.writelines, ["blah\n"])
343
Christian Heimes1a6387e2008-03-26 12:49:49 +0000344 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000345 with self.open(support.TESTFN, "wb", buffering=0) as f:
346 self.assertEqual(f.readable(), False)
347 self.assertEqual(f.writable(), True)
348 self.assertEqual(f.seekable(), True)
349 self.write_ops(f)
350 with self.open(support.TESTFN, "rb", buffering=0) as f:
351 self.assertEqual(f.readable(), True)
352 self.assertEqual(f.writable(), False)
353 self.assertEqual(f.seekable(), True)
354 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000355
356 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000357 with self.open(support.TESTFN, "wb") as f:
358 self.assertEqual(f.readable(), False)
359 self.assertEqual(f.writable(), True)
360 self.assertEqual(f.seekable(), True)
361 self.write_ops(f)
362 with self.open(support.TESTFN, "rb") as f:
363 self.assertEqual(f.readable(), True)
364 self.assertEqual(f.writable(), False)
365 self.assertEqual(f.seekable(), True)
366 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000367
368 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000369 with self.open(support.TESTFN, "wb") as f:
370 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
371 with self.open(support.TESTFN, "rb") as f:
372 self.assertEqual(f.readline(), b"abc\n")
373 self.assertEqual(f.readline(10), b"def\n")
374 self.assertEqual(f.readline(2), b"xy")
375 self.assertEqual(f.readline(4), b"zzy\n")
376 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000377 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000378 self.assertRaises(TypeError, f.readline, 5.3)
379 with self.open(support.TESTFN, "r") as f:
380 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000381
382 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000383 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000384 self.write_ops(f)
385 data = f.getvalue()
386 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000387 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000388 self.read_ops(f, True)
389
390 def test_large_file_ops(self):
391 # On Windows and Mac OSX this test comsumes large resources; It takes
392 # a long time to build the >2GB file and takes >2GB of disk space
393 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000394 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
395 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000396 print("\nTesting large file ops skipped on %s." % sys.platform,
397 file=sys.stderr)
398 print("It requires %d bytes and a long time." % self.LARGE,
399 file=sys.stderr)
400 print("Use 'regrtest.py -u largefile test_io' to run it.",
401 file=sys.stderr)
402 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000403 with self.open(support.TESTFN, "w+b", 0) as f:
404 self.large_file_ops(f)
405 with self.open(support.TESTFN, "w+b") as f:
406 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000407
408 def test_with_open(self):
409 for bufsize in (0, 1, 100):
410 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000411 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000412 f.write(b"xxx")
413 self.assertEqual(f.closed, True)
414 f = None
415 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000416 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000417 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000418 except ZeroDivisionError:
419 self.assertEqual(f.closed, True)
420 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000421 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000422
Antoine Pitroue741cc62009-01-21 00:45:36 +0000423 # issue 5008
424 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000425 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000426 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000427 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000428 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000429 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000430 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000431 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000432 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000433
Christian Heimes1a6387e2008-03-26 12:49:49 +0000434 def test_destructor(self):
435 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000436 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000437 def __del__(self):
438 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000439 try:
440 f = super(MyFileIO, self).__del__
441 except AttributeError:
442 pass
443 else:
444 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000445 def close(self):
446 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000447 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000448 def flush(self):
449 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000450 super(MyFileIO, self).flush()
451 f = MyFileIO(support.TESTFN, "wb")
452 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000453 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000454 support.gc_collect()
455 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000456 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000457 self.assertEqual(f.read(), b"xxx")
458
459 def _check_base_destructor(self, base):
460 record = []
461 class MyIO(base):
462 def __init__(self):
463 # This exercises the availability of attributes on object
464 # destruction.
465 # (in the C version, close() is called by the tp_dealloc
466 # function, not by __del__)
467 self.on_del = 1
468 self.on_close = 2
469 self.on_flush = 3
470 def __del__(self):
471 record.append(self.on_del)
472 try:
473 f = super(MyIO, self).__del__
474 except AttributeError:
475 pass
476 else:
477 f()
478 def close(self):
479 record.append(self.on_close)
480 super(MyIO, self).close()
481 def flush(self):
482 record.append(self.on_flush)
483 super(MyIO, self).flush()
484 f = MyIO()
485 del f
486 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000487 self.assertEqual(record, [1, 2, 3])
488
Antoine Pitrou19690592009-06-12 20:14:08 +0000489 def test_IOBase_destructor(self):
490 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000491
Antoine Pitrou19690592009-06-12 20:14:08 +0000492 def test_RawIOBase_destructor(self):
493 self._check_base_destructor(self.RawIOBase)
494
495 def test_BufferedIOBase_destructor(self):
496 self._check_base_destructor(self.BufferedIOBase)
497
498 def test_TextIOBase_destructor(self):
499 self._check_base_destructor(self.TextIOBase)
500
501 def test_close_flushes(self):
502 with self.open(support.TESTFN, "wb") as f:
503 f.write(b"xxx")
504 with self.open(support.TESTFN, "rb") as f:
505 self.assertEqual(f.read(), b"xxx")
506
507 def test_array_writes(self):
508 a = array.array(b'i', range(10))
509 n = len(a.tostring())
510 with self.open(support.TESTFN, "wb", 0) as f:
511 self.assertEqual(f.write(a), n)
512 with self.open(support.TESTFN, "wb") as f:
513 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000514
515 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000516 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000517 closefd=False)
518
Antoine Pitrou19690592009-06-12 20:14:08 +0000519 def test_read_closed(self):
520 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000521 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000522 with self.open(support.TESTFN, "r") as f:
523 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000524 self.assertEqual(file.read(), "egg\n")
525 file.seek(0)
526 file.close()
527 self.assertRaises(ValueError, file.read)
528
529 def test_no_closefd_with_filename(self):
530 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000532
533 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000534 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000535 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000536 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000537 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000538 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000539 self.assertEqual(file.buffer.raw.closefd, False)
540
Antoine Pitrou19690592009-06-12 20:14:08 +0000541 def test_garbage_collection(self):
542 # FileIO objects are collected, and collecting them flushes
543 # all data to disk.
544 f = self.FileIO(support.TESTFN, "wb")
545 f.write(b"abcxxx")
546 f.f = f
547 wr = weakref.ref(f)
548 del f
549 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000550 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000551 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000552 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000553
Antoine Pitrou19690592009-06-12 20:14:08 +0000554 def test_unbounded_file(self):
555 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
556 zero = "/dev/zero"
557 if not os.path.exists(zero):
558 self.skipTest("{0} does not exist".format(zero))
559 if sys.maxsize > 0x7FFFFFFF:
560 self.skipTest("test can only run in a 32-bit address space")
561 if support.real_max_memuse < support._2G:
562 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000563 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000564 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000565 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000566 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000567 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000568 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000569
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000570 def test_flush_error_on_close(self):
571 f = self.open(support.TESTFN, "wb", buffering=0)
572 def bad_flush():
573 raise IOError()
574 f.flush = bad_flush
575 self.assertRaises(IOError, f.close) # exception not swallowed
576
577 def test_multi_close(self):
578 f = self.open(support.TESTFN, "wb", buffering=0)
579 f.close()
580 f.close()
581 f.close()
582 self.assertRaises(ValueError, f.flush)
583
Antoine Pitrou6391b342010-09-14 18:48:19 +0000584 def test_RawIOBase_read(self):
585 # Exercise the default RawIOBase.read() implementation (which calls
586 # readinto() internally).
587 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
588 self.assertEqual(rawio.read(2), b"ab")
589 self.assertEqual(rawio.read(2), b"c")
590 self.assertEqual(rawio.read(2), b"d")
591 self.assertEqual(rawio.read(2), None)
592 self.assertEqual(rawio.read(2), b"ef")
593 self.assertEqual(rawio.read(2), b"g")
594 self.assertEqual(rawio.read(2), None)
595 self.assertEqual(rawio.read(2), b"")
596
Hynek Schlawack877effc2012-05-25 09:24:18 +0200597 def test_fileio_closefd(self):
598 # Issue #4841
599 with self.open(__file__, 'rb') as f1, \
600 self.open(__file__, 'rb') as f2:
601 fileio = self.FileIO(f1.fileno(), closefd=False)
602 # .__init__() must not close f1
603 fileio.__init__(f2.fileno(), closefd=False)
604 f1.readline()
605 # .close() must not close f2
606 fileio.close()
607 f2.readline()
608
609
Antoine Pitrou19690592009-06-12 20:14:08 +0000610class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200611
612 def test_IOBase_finalize(self):
613 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
614 # class which inherits IOBase and an object of this class are caught
615 # in a reference cycle and close() is already in the method cache.
616 class MyIO(self.IOBase):
617 def close(self):
618 pass
619
620 # create an instance to populate the method cache
621 MyIO()
622 obj = MyIO()
623 obj.obj = obj
624 wr = weakref.ref(obj)
625 del MyIO
626 del obj
627 support.gc_collect()
628 self.assertTrue(wr() is None, wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000629
Antoine Pitrou19690592009-06-12 20:14:08 +0000630class PyIOTest(IOTest):
631 test_array_writes = unittest.skip(
632 "len(array.array) returns number of elements rather than bytelength"
633 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000634
635
Antoine Pitrou19690592009-06-12 20:14:08 +0000636class CommonBufferedTests:
637 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
638
639 def test_detach(self):
640 raw = self.MockRawIO()
641 buf = self.tp(raw)
642 self.assertIs(buf.detach(), raw)
643 self.assertRaises(ValueError, buf.detach)
644
645 def test_fileno(self):
646 rawio = self.MockRawIO()
647 bufio = self.tp(rawio)
648
Ezio Melotti2623a372010-11-21 13:34:58 +0000649 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000650
651 def test_no_fileno(self):
652 # XXX will we always have fileno() function? If so, kill
653 # this test. Else, write it.
654 pass
655
656 def test_invalid_args(self):
657 rawio = self.MockRawIO()
658 bufio = self.tp(rawio)
659 # Invalid whence
660 self.assertRaises(ValueError, bufio.seek, 0, -1)
661 self.assertRaises(ValueError, bufio.seek, 0, 3)
662
663 def test_override_destructor(self):
664 tp = self.tp
665 record = []
666 class MyBufferedIO(tp):
667 def __del__(self):
668 record.append(1)
669 try:
670 f = super(MyBufferedIO, self).__del__
671 except AttributeError:
672 pass
673 else:
674 f()
675 def close(self):
676 record.append(2)
677 super(MyBufferedIO, self).close()
678 def flush(self):
679 record.append(3)
680 super(MyBufferedIO, self).flush()
681 rawio = self.MockRawIO()
682 bufio = MyBufferedIO(rawio)
683 writable = bufio.writable()
684 del bufio
685 support.gc_collect()
686 if writable:
687 self.assertEqual(record, [1, 2, 3])
688 else:
689 self.assertEqual(record, [1, 2])
690
691 def test_context_manager(self):
692 # Test usability as a context manager
693 rawio = self.MockRawIO()
694 bufio = self.tp(rawio)
695 def _with():
696 with bufio:
697 pass
698 _with()
699 # bufio should now be closed, and using it a second time should raise
700 # a ValueError.
701 self.assertRaises(ValueError, _with)
702
703 def test_error_through_destructor(self):
704 # Test that the exception state is not modified by a destructor,
705 # even if close() fails.
706 rawio = self.CloseFailureIO()
707 def f():
708 self.tp(rawio).xyzzy
709 with support.captured_output("stderr") as s:
710 self.assertRaises(AttributeError, f)
711 s = s.getvalue().strip()
712 if s:
713 # The destructor *may* have printed an unraisable error, check it
714 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000715 self.assertTrue(s.startswith("Exception IOError: "), s)
716 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000717
718 def test_repr(self):
719 raw = self.MockRawIO()
720 b = self.tp(raw)
721 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
722 self.assertEqual(repr(b), "<%s>" % clsname)
723 raw.name = "dummy"
724 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
725 raw.name = b"dummy"
726 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000727
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000728 def test_flush_error_on_close(self):
729 raw = self.MockRawIO()
730 def bad_flush():
731 raise IOError()
732 raw.flush = bad_flush
733 b = self.tp(raw)
734 self.assertRaises(IOError, b.close) # exception not swallowed
735
736 def test_multi_close(self):
737 raw = self.MockRawIO()
738 b = self.tp(raw)
739 b.close()
740 b.close()
741 b.close()
742 self.assertRaises(ValueError, b.flush)
743
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000744 def test_readonly_attributes(self):
745 raw = self.MockRawIO()
746 buf = self.tp(raw)
747 x = self.MockRawIO()
748 with self.assertRaises((AttributeError, TypeError)):
749 buf.raw = x
750
Christian Heimes1a6387e2008-03-26 12:49:49 +0000751
Antoine Pitroubff5df02012-07-29 19:02:46 +0200752class SizeofTest:
753
754 @support.cpython_only
755 def test_sizeof(self):
756 bufsize1 = 4096
757 bufsize2 = 8192
758 rawio = self.MockRawIO()
759 bufio = self.tp(rawio, buffer_size=bufsize1)
760 size = sys.getsizeof(bufio) - bufsize1
761 rawio = self.MockRawIO()
762 bufio = self.tp(rawio, buffer_size=bufsize2)
763 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
764
765
Antoine Pitrou19690592009-06-12 20:14:08 +0000766class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
767 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000768
Antoine Pitrou19690592009-06-12 20:14:08 +0000769 def test_constructor(self):
770 rawio = self.MockRawIO([b"abc"])
771 bufio = self.tp(rawio)
772 bufio.__init__(rawio)
773 bufio.__init__(rawio, buffer_size=1024)
774 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000775 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000776 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
777 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
778 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
779 rawio = self.MockRawIO([b"abc"])
780 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000781 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000782
Antoine Pitrou19690592009-06-12 20:14:08 +0000783 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000784 for arg in (None, 7):
785 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
786 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000787 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000788 # Invalid args
789 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000790
Antoine Pitrou19690592009-06-12 20:14:08 +0000791 def test_read1(self):
792 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
793 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000794 self.assertEqual(b"a", bufio.read(1))
795 self.assertEqual(b"b", bufio.read1(1))
796 self.assertEqual(rawio._reads, 1)
797 self.assertEqual(b"c", bufio.read1(100))
798 self.assertEqual(rawio._reads, 1)
799 self.assertEqual(b"d", bufio.read1(100))
800 self.assertEqual(rawio._reads, 2)
801 self.assertEqual(b"efg", bufio.read1(100))
802 self.assertEqual(rawio._reads, 3)
803 self.assertEqual(b"", bufio.read1(100))
804 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000805 # Invalid args
806 self.assertRaises(ValueError, bufio.read1, -1)
807
808 def test_readinto(self):
809 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
810 bufio = self.tp(rawio)
811 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000812 self.assertEqual(bufio.readinto(b), 2)
813 self.assertEqual(b, b"ab")
814 self.assertEqual(bufio.readinto(b), 2)
815 self.assertEqual(b, b"cd")
816 self.assertEqual(bufio.readinto(b), 2)
817 self.assertEqual(b, b"ef")
818 self.assertEqual(bufio.readinto(b), 1)
819 self.assertEqual(b, b"gf")
820 self.assertEqual(bufio.readinto(b), 0)
821 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000822
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000823 def test_readlines(self):
824 def bufio():
825 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
826 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000827 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
828 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
829 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000830
Antoine Pitrou19690592009-06-12 20:14:08 +0000831 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000832 data = b"abcdefghi"
833 dlen = len(data)
834
835 tests = [
836 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
837 [ 100, [ 3, 3, 3], [ dlen ] ],
838 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
839 ]
840
841 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000842 rawio = self.MockFileIO(data)
843 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000844 pos = 0
845 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000846 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000847 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000848 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000849 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000850
Antoine Pitrou19690592009-06-12 20:14:08 +0000851 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000852 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000853 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
854 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000855 self.assertEqual(b"abcd", bufio.read(6))
856 self.assertEqual(b"e", bufio.read(1))
857 self.assertEqual(b"fg", bufio.read())
858 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200859 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000860 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000861
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200862 rawio = self.MockRawIO((b"a", None, None))
863 self.assertEqual(b"a", rawio.readall())
864 self.assertIsNone(rawio.readall())
865
Antoine Pitrou19690592009-06-12 20:14:08 +0000866 def test_read_past_eof(self):
867 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
868 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000869
Ezio Melotti2623a372010-11-21 13:34:58 +0000870 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000871
Antoine Pitrou19690592009-06-12 20:14:08 +0000872 def test_read_all(self):
873 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
874 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000875
Ezio Melotti2623a372010-11-21 13:34:58 +0000876 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000877
Victor Stinner6a102812010-04-27 23:55:59 +0000878 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000879 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000880 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000881 try:
882 # Write out many bytes with exactly the same number of 0's,
883 # 1's... 255's. This will help us check that concurrent reading
884 # doesn't duplicate or forget contents.
885 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000886 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000887 random.shuffle(l)
888 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000889 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000890 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000891 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000892 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000893 errors = []
894 results = []
895 def f():
896 try:
897 # Intra-buffer read then buffer-flushing read
898 for n in cycle([1, 19]):
899 s = bufio.read(n)
900 if not s:
901 break
902 # list.append() is atomic
903 results.append(s)
904 except Exception as e:
905 errors.append(e)
906 raise
907 threads = [threading.Thread(target=f) for x in range(20)]
908 for t in threads:
909 t.start()
910 time.sleep(0.02) # yield
911 for t in threads:
912 t.join()
913 self.assertFalse(errors,
914 "the following exceptions were caught: %r" % errors)
915 s = b''.join(results)
916 for i in range(256):
917 c = bytes(bytearray([i]))
918 self.assertEqual(s.count(c), N)
919 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000920 support.unlink(support.TESTFN)
921
922 def test_misbehaved_io(self):
923 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
924 bufio = self.tp(rawio)
925 self.assertRaises(IOError, bufio.seek, 0)
926 self.assertRaises(IOError, bufio.tell)
927
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000928 def test_no_extraneous_read(self):
929 # Issue #9550; when the raw IO object has satisfied the read request,
930 # we should not issue any additional reads, otherwise it may block
931 # (e.g. socket).
932 bufsize = 16
933 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
934 rawio = self.MockRawIO([b"x" * n])
935 bufio = self.tp(rawio, bufsize)
936 self.assertEqual(bufio.read(n), b"x" * n)
937 # Simple case: one raw read is enough to satisfy the request.
938 self.assertEqual(rawio._extraneous_reads, 0,
939 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
940 # A more complex case where two raw reads are needed to satisfy
941 # the request.
942 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
943 bufio = self.tp(rawio, bufsize)
944 self.assertEqual(bufio.read(n), b"x" * n)
945 self.assertEqual(rawio._extraneous_reads, 0,
946 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
947
948
Antoine Pitroubff5df02012-07-29 19:02:46 +0200949class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +0000950 tp = io.BufferedReader
951
952 def test_constructor(self):
953 BufferedReaderTest.test_constructor(self)
954 # The allocation can succeed on 32-bit builds, e.g. with more
955 # than 2GB RAM and a 64-bit kernel.
956 if sys.maxsize > 0x7FFFFFFF:
957 rawio = self.MockRawIO()
958 bufio = self.tp(rawio)
959 self.assertRaises((OverflowError, MemoryError, ValueError),
960 bufio.__init__, rawio, sys.maxsize)
961
962 def test_initialization(self):
963 rawio = self.MockRawIO([b"abc"])
964 bufio = self.tp(rawio)
965 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
966 self.assertRaises(ValueError, bufio.read)
967 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
968 self.assertRaises(ValueError, bufio.read)
969 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
970 self.assertRaises(ValueError, bufio.read)
971
972 def test_misbehaved_io_read(self):
973 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
974 bufio = self.tp(rawio)
975 # _pyio.BufferedReader seems to implement reading different, so that
976 # checking this is not so easy.
977 self.assertRaises(IOError, bufio.read, 10)
978
979 def test_garbage_collection(self):
980 # C BufferedReader objects are collected.
981 # The Python version has __del__, so it ends into gc.garbage instead
982 rawio = self.FileIO(support.TESTFN, "w+b")
983 f = self.tp(rawio)
984 f.f = f
985 wr = weakref.ref(f)
986 del f
987 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000988 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +0000989
990class PyBufferedReaderTest(BufferedReaderTest):
991 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000992
993
Antoine Pitrou19690592009-06-12 20:14:08 +0000994class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
995 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000996
Antoine Pitrou19690592009-06-12 20:14:08 +0000997 def test_constructor(self):
998 rawio = self.MockRawIO()
999 bufio = self.tp(rawio)
1000 bufio.__init__(rawio)
1001 bufio.__init__(rawio, buffer_size=1024)
1002 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +00001003 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001004 bufio.flush()
1005 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1006 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1007 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1008 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +00001009 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001010 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001011 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001012
Antoine Pitrou19690592009-06-12 20:14:08 +00001013 def test_detach_flush(self):
1014 raw = self.MockRawIO()
1015 buf = self.tp(raw)
1016 buf.write(b"howdy!")
1017 self.assertFalse(raw._write_stack)
1018 buf.detach()
1019 self.assertEqual(raw._write_stack, [b"howdy!"])
1020
1021 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001022 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +00001023 writer = self.MockRawIO()
1024 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001025 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001026 self.assertFalse(writer._write_stack)
1027
Antoine Pitrou19690592009-06-12 20:14:08 +00001028 def test_write_overflow(self):
1029 writer = self.MockRawIO()
1030 bufio = self.tp(writer, 8)
1031 contents = b"abcdefghijklmnop"
1032 for n in range(0, len(contents), 3):
1033 bufio.write(contents[n:n+3])
1034 flushed = b"".join(writer._write_stack)
1035 # At least (total - 8) bytes were implicitly flushed, perhaps more
1036 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001037 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001038
Antoine Pitrou19690592009-06-12 20:14:08 +00001039 def check_writes(self, intermediate_func):
1040 # Lots of writes, test the flushed output is as expected.
1041 contents = bytes(range(256)) * 1000
1042 n = 0
1043 writer = self.MockRawIO()
1044 bufio = self.tp(writer, 13)
1045 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1046 def gen_sizes():
1047 for size in count(1):
1048 for i in range(15):
1049 yield size
1050 sizes = gen_sizes()
1051 while n < len(contents):
1052 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001053 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001054 intermediate_func(bufio)
1055 n += size
1056 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001057 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001058 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001059
Antoine Pitrou19690592009-06-12 20:14:08 +00001060 def test_writes(self):
1061 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001062
Antoine Pitrou19690592009-06-12 20:14:08 +00001063 def test_writes_and_flushes(self):
1064 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001065
Antoine Pitrou19690592009-06-12 20:14:08 +00001066 def test_writes_and_seeks(self):
1067 def _seekabs(bufio):
1068 pos = bufio.tell()
1069 bufio.seek(pos + 1, 0)
1070 bufio.seek(pos - 1, 0)
1071 bufio.seek(pos, 0)
1072 self.check_writes(_seekabs)
1073 def _seekrel(bufio):
1074 pos = bufio.seek(0, 1)
1075 bufio.seek(+1, 1)
1076 bufio.seek(-1, 1)
1077 bufio.seek(pos, 0)
1078 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001079
Antoine Pitrou19690592009-06-12 20:14:08 +00001080 def test_writes_and_truncates(self):
1081 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001082
Antoine Pitrou19690592009-06-12 20:14:08 +00001083 def test_write_non_blocking(self):
1084 raw = self.MockNonBlockWriterIO()
1085 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001086
Ezio Melotti2623a372010-11-21 13:34:58 +00001087 self.assertEqual(bufio.write(b"abcd"), 4)
1088 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001089 # 1 byte will be written, the rest will be buffered
1090 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001091 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001092
Antoine Pitrou19690592009-06-12 20:14:08 +00001093 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1094 raw.block_on(b"0")
1095 try:
1096 bufio.write(b"opqrwxyz0123456789")
1097 except self.BlockingIOError as e:
1098 written = e.characters_written
1099 else:
1100 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001101 self.assertEqual(written, 16)
1102 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001103 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001104
Ezio Melotti2623a372010-11-21 13:34:58 +00001105 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001106 s = raw.pop_written()
1107 # Previously buffered bytes were flushed
1108 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001109
Antoine Pitrou19690592009-06-12 20:14:08 +00001110 def test_write_and_rewind(self):
1111 raw = io.BytesIO()
1112 bufio = self.tp(raw, 4)
1113 self.assertEqual(bufio.write(b"abcdef"), 6)
1114 self.assertEqual(bufio.tell(), 6)
1115 bufio.seek(0, 0)
1116 self.assertEqual(bufio.write(b"XY"), 2)
1117 bufio.seek(6, 0)
1118 self.assertEqual(raw.getvalue(), b"XYcdef")
1119 self.assertEqual(bufio.write(b"123456"), 6)
1120 bufio.flush()
1121 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001122
Antoine Pitrou19690592009-06-12 20:14:08 +00001123 def test_flush(self):
1124 writer = self.MockRawIO()
1125 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001126 bufio.write(b"abc")
1127 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001128 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001129
Antoine Pitrou78e761e2012-10-16 22:57:11 +02001130 def test_writelines(self):
1131 l = [b'ab', b'cd', b'ef']
1132 writer = self.MockRawIO()
1133 bufio = self.tp(writer, 8)
1134 bufio.writelines(l)
1135 bufio.flush()
1136 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1137
1138 def test_writelines_userlist(self):
1139 l = UserList([b'ab', b'cd', b'ef'])
1140 writer = self.MockRawIO()
1141 bufio = self.tp(writer, 8)
1142 bufio.writelines(l)
1143 bufio.flush()
1144 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1145
1146 def test_writelines_error(self):
1147 writer = self.MockRawIO()
1148 bufio = self.tp(writer, 8)
1149 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1150 self.assertRaises(TypeError, bufio.writelines, None)
1151
Antoine Pitrou19690592009-06-12 20:14:08 +00001152 def test_destructor(self):
1153 writer = self.MockRawIO()
1154 bufio = self.tp(writer, 8)
1155 bufio.write(b"abc")
1156 del bufio
1157 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001158 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001159
1160 def test_truncate(self):
1161 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001162 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001163 bufio = self.tp(raw, 8)
1164 bufio.write(b"abcdef")
1165 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001166 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001167 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001168 self.assertEqual(f.read(), b"abc")
1169
Victor Stinner6a102812010-04-27 23:55:59 +00001170 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001171 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001172 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001173 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001174 # Write out many bytes from many threads and test they were
1175 # all flushed.
1176 N = 1000
1177 contents = bytes(range(256)) * N
1178 sizes = cycle([1, 19])
1179 n = 0
1180 queue = deque()
1181 while n < len(contents):
1182 size = next(sizes)
1183 queue.append(contents[n:n+size])
1184 n += size
1185 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001186 # We use a real file object because it allows us to
1187 # exercise situations where the GIL is released before
1188 # writing the buffer to the raw streams. This is in addition
1189 # to concurrency issues due to switching threads in the middle
1190 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001191 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001192 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001193 errors = []
1194 def f():
1195 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001196 while True:
1197 try:
1198 s = queue.popleft()
1199 except IndexError:
1200 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001201 bufio.write(s)
1202 except Exception as e:
1203 errors.append(e)
1204 raise
1205 threads = [threading.Thread(target=f) for x in range(20)]
1206 for t in threads:
1207 t.start()
1208 time.sleep(0.02) # yield
1209 for t in threads:
1210 t.join()
1211 self.assertFalse(errors,
1212 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001213 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001214 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001215 s = f.read()
1216 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001217 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001218 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001219 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001220
Antoine Pitrou19690592009-06-12 20:14:08 +00001221 def test_misbehaved_io(self):
1222 rawio = self.MisbehavedRawIO()
1223 bufio = self.tp(rawio, 5)
1224 self.assertRaises(IOError, bufio.seek, 0)
1225 self.assertRaises(IOError, bufio.tell)
1226 self.assertRaises(IOError, bufio.write, b"abcdef")
1227
1228 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001229 with support.check_warnings(("max_buffer_size is deprecated",
1230 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001231 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001232
1233
Antoine Pitroubff5df02012-07-29 19:02:46 +02001234class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001235 tp = io.BufferedWriter
1236
1237 def test_constructor(self):
1238 BufferedWriterTest.test_constructor(self)
1239 # The allocation can succeed on 32-bit builds, e.g. with more
1240 # than 2GB RAM and a 64-bit kernel.
1241 if sys.maxsize > 0x7FFFFFFF:
1242 rawio = self.MockRawIO()
1243 bufio = self.tp(rawio)
1244 self.assertRaises((OverflowError, MemoryError, ValueError),
1245 bufio.__init__, rawio, sys.maxsize)
1246
1247 def test_initialization(self):
1248 rawio = self.MockRawIO()
1249 bufio = self.tp(rawio)
1250 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1251 self.assertRaises(ValueError, bufio.write, b"def")
1252 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1253 self.assertRaises(ValueError, bufio.write, b"def")
1254 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1255 self.assertRaises(ValueError, bufio.write, b"def")
1256
1257 def test_garbage_collection(self):
1258 # C BufferedWriter objects are collected, and collecting them flushes
1259 # all data to disk.
1260 # The Python version has __del__, so it ends into gc.garbage instead
1261 rawio = self.FileIO(support.TESTFN, "w+b")
1262 f = self.tp(rawio)
1263 f.write(b"123xxx")
1264 f.x = f
1265 wr = weakref.ref(f)
1266 del f
1267 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001268 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001269 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001270 self.assertEqual(f.read(), b"123xxx")
1271
1272
1273class PyBufferedWriterTest(BufferedWriterTest):
1274 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001275
1276class BufferedRWPairTest(unittest.TestCase):
1277
Antoine Pitrou19690592009-06-12 20:14:08 +00001278 def test_constructor(self):
1279 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001280 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001281
Antoine Pitrou19690592009-06-12 20:14:08 +00001282 def test_detach(self):
1283 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1284 self.assertRaises(self.UnsupportedOperation, pair.detach)
1285
1286 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001287 with support.check_warnings(("max_buffer_size is deprecated",
1288 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001289 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001290
1291 def test_constructor_with_not_readable(self):
1292 class NotReadable(MockRawIO):
1293 def readable(self):
1294 return False
1295
1296 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1297
1298 def test_constructor_with_not_writeable(self):
1299 class NotWriteable(MockRawIO):
1300 def writable(self):
1301 return False
1302
1303 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1304
1305 def test_read(self):
1306 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1307
1308 self.assertEqual(pair.read(3), b"abc")
1309 self.assertEqual(pair.read(1), b"d")
1310 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001311 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1312 self.assertEqual(pair.read(None), b"abc")
1313
1314 def test_readlines(self):
1315 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1316 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1317 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1318 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001319
1320 def test_read1(self):
1321 # .read1() is delegated to the underlying reader object, so this test
1322 # can be shallow.
1323 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1324
1325 self.assertEqual(pair.read1(3), b"abc")
1326
1327 def test_readinto(self):
1328 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1329
1330 data = bytearray(5)
1331 self.assertEqual(pair.readinto(data), 5)
1332 self.assertEqual(data, b"abcde")
1333
1334 def test_write(self):
1335 w = self.MockRawIO()
1336 pair = self.tp(self.MockRawIO(), w)
1337
1338 pair.write(b"abc")
1339 pair.flush()
1340 pair.write(b"def")
1341 pair.flush()
1342 self.assertEqual(w._write_stack, [b"abc", b"def"])
1343
1344 def test_peek(self):
1345 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1346
1347 self.assertTrue(pair.peek(3).startswith(b"abc"))
1348 self.assertEqual(pair.read(3), b"abc")
1349
1350 def test_readable(self):
1351 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1352 self.assertTrue(pair.readable())
1353
1354 def test_writeable(self):
1355 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1356 self.assertTrue(pair.writable())
1357
1358 def test_seekable(self):
1359 # BufferedRWPairs are never seekable, even if their readers and writers
1360 # are.
1361 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1362 self.assertFalse(pair.seekable())
1363
1364 # .flush() is delegated to the underlying writer object and has been
1365 # tested in the test_write method.
1366
1367 def test_close_and_closed(self):
1368 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1369 self.assertFalse(pair.closed)
1370 pair.close()
1371 self.assertTrue(pair.closed)
1372
1373 def test_isatty(self):
1374 class SelectableIsAtty(MockRawIO):
1375 def __init__(self, isatty):
1376 MockRawIO.__init__(self)
1377 self._isatty = isatty
1378
1379 def isatty(self):
1380 return self._isatty
1381
1382 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1383 self.assertFalse(pair.isatty())
1384
1385 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1386 self.assertTrue(pair.isatty())
1387
1388 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1389 self.assertTrue(pair.isatty())
1390
1391 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1392 self.assertTrue(pair.isatty())
1393
1394class CBufferedRWPairTest(BufferedRWPairTest):
1395 tp = io.BufferedRWPair
1396
1397class PyBufferedRWPairTest(BufferedRWPairTest):
1398 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001399
1400
Antoine Pitrou19690592009-06-12 20:14:08 +00001401class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1402 read_mode = "rb+"
1403 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001404
Antoine Pitrou19690592009-06-12 20:14:08 +00001405 def test_constructor(self):
1406 BufferedReaderTest.test_constructor(self)
1407 BufferedWriterTest.test_constructor(self)
1408
1409 def test_read_and_write(self):
1410 raw = self.MockRawIO((b"asdf", b"ghjk"))
1411 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001412
1413 self.assertEqual(b"as", rw.read(2))
1414 rw.write(b"ddd")
1415 rw.write(b"eee")
1416 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001417 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001418 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001419
Antoine Pitrou19690592009-06-12 20:14:08 +00001420 def test_seek_and_tell(self):
1421 raw = self.BytesIO(b"asdfghjkl")
1422 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001423
Ezio Melotti2623a372010-11-21 13:34:58 +00001424 self.assertEqual(b"as", rw.read(2))
1425 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001426 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001427 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001428
Antoine Pitrou808cec52011-08-20 15:40:58 +02001429 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001430 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001431 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001432 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001433 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001434 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001435 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001436 self.assertEqual(7, rw.tell())
1437 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001438 rw.flush()
1439 self.assertEqual(b"asdf123fl", raw.getvalue())
1440
Christian Heimes1a6387e2008-03-26 12:49:49 +00001441 self.assertRaises(TypeError, rw.seek, 0.0)
1442
Antoine Pitrou19690592009-06-12 20:14:08 +00001443 def check_flush_and_read(self, read_func):
1444 raw = self.BytesIO(b"abcdefghi")
1445 bufio = self.tp(raw)
1446
Ezio Melotti2623a372010-11-21 13:34:58 +00001447 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001448 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001449 self.assertEqual(b"ef", read_func(bufio, 2))
1450 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001451 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001452 self.assertEqual(6, bufio.tell())
1453 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001454 raw.seek(0, 0)
1455 raw.write(b"XYZ")
1456 # flush() resets the read buffer
1457 bufio.flush()
1458 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001459 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001460
1461 def test_flush_and_read(self):
1462 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1463
1464 def test_flush_and_readinto(self):
1465 def _readinto(bufio, n=-1):
1466 b = bytearray(n if n >= 0 else 9999)
1467 n = bufio.readinto(b)
1468 return bytes(b[:n])
1469 self.check_flush_and_read(_readinto)
1470
1471 def test_flush_and_peek(self):
1472 def _peek(bufio, n=-1):
1473 # This relies on the fact that the buffer can contain the whole
1474 # raw stream, otherwise peek() can return less.
1475 b = bufio.peek(n)
1476 if n != -1:
1477 b = b[:n]
1478 bufio.seek(len(b), 1)
1479 return b
1480 self.check_flush_and_read(_peek)
1481
1482 def test_flush_and_write(self):
1483 raw = self.BytesIO(b"abcdefghi")
1484 bufio = self.tp(raw)
1485
1486 bufio.write(b"123")
1487 bufio.flush()
1488 bufio.write(b"45")
1489 bufio.flush()
1490 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001491 self.assertEqual(b"12345fghi", raw.getvalue())
1492 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001493
1494 def test_threads(self):
1495 BufferedReaderTest.test_threads(self)
1496 BufferedWriterTest.test_threads(self)
1497
1498 def test_writes_and_peek(self):
1499 def _peek(bufio):
1500 bufio.peek(1)
1501 self.check_writes(_peek)
1502 def _peek(bufio):
1503 pos = bufio.tell()
1504 bufio.seek(-1, 1)
1505 bufio.peek(1)
1506 bufio.seek(pos, 0)
1507 self.check_writes(_peek)
1508
1509 def test_writes_and_reads(self):
1510 def _read(bufio):
1511 bufio.seek(-1, 1)
1512 bufio.read(1)
1513 self.check_writes(_read)
1514
1515 def test_writes_and_read1s(self):
1516 def _read1(bufio):
1517 bufio.seek(-1, 1)
1518 bufio.read1(1)
1519 self.check_writes(_read1)
1520
1521 def test_writes_and_readintos(self):
1522 def _read(bufio):
1523 bufio.seek(-1, 1)
1524 bufio.readinto(bytearray(1))
1525 self.check_writes(_read)
1526
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001527 def test_write_after_readahead(self):
1528 # Issue #6629: writing after the buffer was filled by readahead should
1529 # first rewind the raw stream.
1530 for overwrite_size in [1, 5]:
1531 raw = self.BytesIO(b"A" * 10)
1532 bufio = self.tp(raw, 4)
1533 # Trigger readahead
1534 self.assertEqual(bufio.read(1), b"A")
1535 self.assertEqual(bufio.tell(), 1)
1536 # Overwriting should rewind the raw stream if it needs so
1537 bufio.write(b"B" * overwrite_size)
1538 self.assertEqual(bufio.tell(), overwrite_size + 1)
1539 # If the write size was smaller than the buffer size, flush() and
1540 # check that rewind happens.
1541 bufio.flush()
1542 self.assertEqual(bufio.tell(), overwrite_size + 1)
1543 s = raw.getvalue()
1544 self.assertEqual(s,
1545 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1546
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001547 def test_write_rewind_write(self):
1548 # Various combinations of reading / writing / seeking backwards / writing again
1549 def mutate(bufio, pos1, pos2):
1550 assert pos2 >= pos1
1551 # Fill the buffer
1552 bufio.seek(pos1)
1553 bufio.read(pos2 - pos1)
1554 bufio.write(b'\x02')
1555 # This writes earlier than the previous write, but still inside
1556 # the buffer.
1557 bufio.seek(pos1)
1558 bufio.write(b'\x01')
1559
1560 b = b"\x80\x81\x82\x83\x84"
1561 for i in range(0, len(b)):
1562 for j in range(i, len(b)):
1563 raw = self.BytesIO(b)
1564 bufio = self.tp(raw, 100)
1565 mutate(bufio, i, j)
1566 bufio.flush()
1567 expected = bytearray(b)
1568 expected[j] = 2
1569 expected[i] = 1
1570 self.assertEqual(raw.getvalue(), expected,
1571 "failed result for i=%d, j=%d" % (i, j))
1572
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001573 def test_truncate_after_read_or_write(self):
1574 raw = self.BytesIO(b"A" * 10)
1575 bufio = self.tp(raw, 100)
1576 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1577 self.assertEqual(bufio.truncate(), 2)
1578 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1579 self.assertEqual(bufio.truncate(), 4)
1580
Antoine Pitrou19690592009-06-12 20:14:08 +00001581 def test_misbehaved_io(self):
1582 BufferedReaderTest.test_misbehaved_io(self)
1583 BufferedWriterTest.test_misbehaved_io(self)
1584
Antoine Pitrou808cec52011-08-20 15:40:58 +02001585 def test_interleaved_read_write(self):
1586 # Test for issue #12213
1587 with self.BytesIO(b'abcdefgh') as raw:
1588 with self.tp(raw, 100) as f:
1589 f.write(b"1")
1590 self.assertEqual(f.read(1), b'b')
1591 f.write(b'2')
1592 self.assertEqual(f.read1(1), b'd')
1593 f.write(b'3')
1594 buf = bytearray(1)
1595 f.readinto(buf)
1596 self.assertEqual(buf, b'f')
1597 f.write(b'4')
1598 self.assertEqual(f.peek(1), b'h')
1599 f.flush()
1600 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1601
1602 with self.BytesIO(b'abc') as raw:
1603 with self.tp(raw, 100) as f:
1604 self.assertEqual(f.read(1), b'a')
1605 f.write(b"2")
1606 self.assertEqual(f.read(1), b'c')
1607 f.flush()
1608 self.assertEqual(raw.getvalue(), b'a2c')
1609
1610 def test_interleaved_readline_write(self):
1611 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1612 with self.tp(raw) as f:
1613 f.write(b'1')
1614 self.assertEqual(f.readline(), b'b\n')
1615 f.write(b'2')
1616 self.assertEqual(f.readline(), b'def\n')
1617 f.write(b'3')
1618 self.assertEqual(f.readline(), b'\n')
1619 f.flush()
1620 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1621
Antoine Pitroubff5df02012-07-29 19:02:46 +02001622class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1623 BufferedRandomTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001624 tp = io.BufferedRandom
1625
1626 def test_constructor(self):
1627 BufferedRandomTest.test_constructor(self)
1628 # The allocation can succeed on 32-bit builds, e.g. with more
1629 # than 2GB RAM and a 64-bit kernel.
1630 if sys.maxsize > 0x7FFFFFFF:
1631 rawio = self.MockRawIO()
1632 bufio = self.tp(rawio)
1633 self.assertRaises((OverflowError, MemoryError, ValueError),
1634 bufio.__init__, rawio, sys.maxsize)
1635
1636 def test_garbage_collection(self):
1637 CBufferedReaderTest.test_garbage_collection(self)
1638 CBufferedWriterTest.test_garbage_collection(self)
1639
1640class PyBufferedRandomTest(BufferedRandomTest):
1641 tp = pyio.BufferedRandom
1642
1643
Christian Heimes1a6387e2008-03-26 12:49:49 +00001644# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1645# properties:
1646# - A single output character can correspond to many bytes of input.
1647# - The number of input bytes to complete the character can be
1648# undetermined until the last input byte is received.
1649# - The number of input bytes can vary depending on previous input.
1650# - A single input byte can correspond to many characters of output.
1651# - The number of output characters can be undetermined until the
1652# last input byte is received.
1653# - The number of output characters can vary depending on previous input.
1654
1655class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1656 """
1657 For testing seek/tell behavior with a stateful, buffering decoder.
1658
1659 Input is a sequence of words. Words may be fixed-length (length set
1660 by input) or variable-length (period-terminated). In variable-length
1661 mode, extra periods are ignored. Possible words are:
1662 - 'i' followed by a number sets the input length, I (maximum 99).
1663 When I is set to 0, words are space-terminated.
1664 - 'o' followed by a number sets the output length, O (maximum 99).
1665 - Any other word is converted into a word followed by a period on
1666 the output. The output word consists of the input word truncated
1667 or padded out with hyphens to make its length equal to O. If O
1668 is 0, the word is output verbatim without truncating or padding.
1669 I and O are initially set to 1. When I changes, any buffered input is
1670 re-scanned according to the new I. EOF also terminates the last word.
1671 """
1672
1673 def __init__(self, errors='strict'):
1674 codecs.IncrementalDecoder.__init__(self, errors)
1675 self.reset()
1676
1677 def __repr__(self):
1678 return '<SID %x>' % id(self)
1679
1680 def reset(self):
1681 self.i = 1
1682 self.o = 1
1683 self.buffer = bytearray()
1684
1685 def getstate(self):
1686 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1687 return bytes(self.buffer), i*100 + o
1688
1689 def setstate(self, state):
1690 buffer, io = state
1691 self.buffer = bytearray(buffer)
1692 i, o = divmod(io, 100)
1693 self.i, self.o = i ^ 1, o ^ 1
1694
1695 def decode(self, input, final=False):
1696 output = ''
1697 for b in input:
1698 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001699 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001700 if self.buffer:
1701 output += self.process_word()
1702 else:
1703 self.buffer.append(b)
1704 else: # fixed-length, terminate after self.i bytes
1705 self.buffer.append(b)
1706 if len(self.buffer) == self.i:
1707 output += self.process_word()
1708 if final and self.buffer: # EOF terminates the last word
1709 output += self.process_word()
1710 return output
1711
1712 def process_word(self):
1713 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001714 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001715 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001716 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001717 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1718 else:
1719 output = self.buffer.decode('ascii')
1720 if len(output) < self.o:
1721 output += '-'*self.o # pad out with hyphens
1722 if self.o:
1723 output = output[:self.o] # truncate to output length
1724 output += '.'
1725 self.buffer = bytearray()
1726 return output
1727
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001728 codecEnabled = False
1729
1730 @classmethod
1731 def lookupTestDecoder(cls, name):
1732 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001733 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001734 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001735 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001736 incrementalencoder=None,
1737 streamreader=None, streamwriter=None,
1738 incrementaldecoder=cls)
1739
1740# Register the previous decoder for testing.
1741# Disabled by default, tests will enable it.
1742codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1743
1744
Christian Heimes1a6387e2008-03-26 12:49:49 +00001745class StatefulIncrementalDecoderTest(unittest.TestCase):
1746 """
1747 Make sure the StatefulIncrementalDecoder actually works.
1748 """
1749
1750 test_cases = [
1751 # I=1, O=1 (fixed-length input == fixed-length output)
1752 (b'abcd', False, 'a.b.c.d.'),
1753 # I=0, O=0 (variable-length input, variable-length output)
1754 (b'oiabcd', True, 'abcd.'),
1755 # I=0, O=0 (should ignore extra periods)
1756 (b'oi...abcd...', True, 'abcd.'),
1757 # I=0, O=6 (variable-length input, fixed-length output)
1758 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1759 # I=2, O=6 (fixed-length input < fixed-length output)
1760 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1761 # I=6, O=3 (fixed-length input > fixed-length output)
1762 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1763 # I=0, then 3; O=29, then 15 (with longer output)
1764 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1765 'a----------------------------.' +
1766 'b----------------------------.' +
1767 'cde--------------------------.' +
1768 'abcdefghijabcde.' +
1769 'a.b------------.' +
1770 '.c.------------.' +
1771 'd.e------------.' +
1772 'k--------------.' +
1773 'l--------------.' +
1774 'm--------------.')
1775 ]
1776
Antoine Pitrou19690592009-06-12 20:14:08 +00001777 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001778 # Try a few one-shot test cases.
1779 for input, eof, output in self.test_cases:
1780 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001781 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001782
1783 # Also test an unfinished decode, followed by forcing EOF.
1784 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001785 self.assertEqual(d.decode(b'oiabcd'), '')
1786 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001787
1788class TextIOWrapperTest(unittest.TestCase):
1789
1790 def setUp(self):
1791 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1792 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001793 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001794
1795 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001796 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001797
Antoine Pitrou19690592009-06-12 20:14:08 +00001798 def test_constructor(self):
1799 r = self.BytesIO(b"\xc3\xa9\n\n")
1800 b = self.BufferedReader(r, 1000)
1801 t = self.TextIOWrapper(b)
1802 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001803 self.assertEqual(t.encoding, "latin1")
1804 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001805 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001806 self.assertEqual(t.encoding, "utf8")
1807 self.assertEqual(t.line_buffering, True)
1808 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001809 self.assertRaises(TypeError, t.__init__, b, newline=42)
1810 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1811
1812 def test_detach(self):
1813 r = self.BytesIO()
1814 b = self.BufferedWriter(r)
1815 t = self.TextIOWrapper(b)
1816 self.assertIs(t.detach(), b)
1817
1818 t = self.TextIOWrapper(b, encoding="ascii")
1819 t.write("howdy")
1820 self.assertFalse(r.getvalue())
1821 t.detach()
1822 self.assertEqual(r.getvalue(), b"howdy")
1823 self.assertRaises(ValueError, t.detach)
1824
1825 def test_repr(self):
1826 raw = self.BytesIO("hello".encode("utf-8"))
1827 b = self.BufferedReader(raw)
1828 t = self.TextIOWrapper(b, encoding="utf-8")
1829 modname = self.TextIOWrapper.__module__
1830 self.assertEqual(repr(t),
1831 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1832 raw.name = "dummy"
1833 self.assertEqual(repr(t),
1834 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1835 raw.name = b"dummy"
1836 self.assertEqual(repr(t),
1837 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1838
1839 def test_line_buffering(self):
1840 r = self.BytesIO()
1841 b = self.BufferedWriter(r, 1000)
1842 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1843 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00001844 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001845 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00001846 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001847 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00001848 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001849
Antoine Pitrou19690592009-06-12 20:14:08 +00001850 def test_encoding(self):
1851 # Check the encoding attribute is always set, and valid
1852 b = self.BytesIO()
1853 t = self.TextIOWrapper(b, encoding="utf8")
1854 self.assertEqual(t.encoding, "utf8")
1855 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001856 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001857 codecs.lookup(t.encoding)
1858
1859 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001860 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001861 b = self.BytesIO(b"abc\n\xff\n")
1862 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001863 self.assertRaises(UnicodeError, t.read)
1864 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001865 b = self.BytesIO(b"abc\n\xff\n")
1866 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001867 self.assertRaises(UnicodeError, t.read)
1868 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001869 b = self.BytesIO(b"abc\n\xff\n")
1870 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00001871 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001872 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001873 b = self.BytesIO(b"abc\n\xff\n")
1874 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00001875 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001876
Antoine Pitrou19690592009-06-12 20:14:08 +00001877 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001878 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001879 b = self.BytesIO()
1880 t = self.TextIOWrapper(b, encoding="ascii")
1881 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001882 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001883 b = self.BytesIO()
1884 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1885 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001886 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001887 b = self.BytesIO()
1888 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001889 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001890 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001891 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001892 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001893 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001894 b = self.BytesIO()
1895 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001896 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001897 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001898 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001899 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001900
Antoine Pitrou19690592009-06-12 20:14:08 +00001901 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001902 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1903
1904 tests = [
1905 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1906 [ '', input_lines ],
1907 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1908 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1909 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1910 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001911 encodings = (
1912 'utf-8', 'latin-1',
1913 'utf-16', 'utf-16-le', 'utf-16-be',
1914 'utf-32', 'utf-32-le', 'utf-32-be',
1915 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001916
1917 # Try a range of buffer sizes to test the case where \r is the last
1918 # character in TextIOWrapper._pending_line.
1919 for encoding in encodings:
1920 # XXX: str.encode() should return bytes
1921 data = bytes(''.join(input_lines).encode(encoding))
1922 for do_reads in (False, True):
1923 for bufsize in range(1, 10):
1924 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001925 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1926 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001927 encoding=encoding)
1928 if do_reads:
1929 got_lines = []
1930 while True:
1931 c2 = textio.read(2)
1932 if c2 == '':
1933 break
Ezio Melotti2623a372010-11-21 13:34:58 +00001934 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001935 got_lines.append(c2 + textio.readline())
1936 else:
1937 got_lines = list(textio)
1938
1939 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00001940 self.assertEqual(got_line, exp_line)
1941 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001942
Antoine Pitrou19690592009-06-12 20:14:08 +00001943 def test_newlines_input(self):
1944 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001945 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1946 for newline, expected in [
1947 (None, normalized.decode("ascii").splitlines(True)),
1948 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001949 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1950 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1951 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001952 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001953 buf = self.BytesIO(testdata)
1954 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00001955 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001956 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001957 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001958
Antoine Pitrou19690592009-06-12 20:14:08 +00001959 def test_newlines_output(self):
1960 testdict = {
1961 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1962 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1963 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1964 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1965 }
1966 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1967 for newline, expected in tests:
1968 buf = self.BytesIO()
1969 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1970 txt.write("AAA\nB")
1971 txt.write("BB\nCCC\n")
1972 txt.write("X\rY\r\nZ")
1973 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001974 self.assertEqual(buf.closed, False)
1975 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00001976
1977 def test_destructor(self):
1978 l = []
1979 base = self.BytesIO
1980 class MyBytesIO(base):
1981 def close(self):
1982 l.append(self.getvalue())
1983 base.close(self)
1984 b = MyBytesIO()
1985 t = self.TextIOWrapper(b, encoding="ascii")
1986 t.write("abc")
1987 del t
1988 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001989 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00001990
1991 def test_override_destructor(self):
1992 record = []
1993 class MyTextIO(self.TextIOWrapper):
1994 def __del__(self):
1995 record.append(1)
1996 try:
1997 f = super(MyTextIO, self).__del__
1998 except AttributeError:
1999 pass
2000 else:
2001 f()
2002 def close(self):
2003 record.append(2)
2004 super(MyTextIO, self).close()
2005 def flush(self):
2006 record.append(3)
2007 super(MyTextIO, self).flush()
2008 b = self.BytesIO()
2009 t = MyTextIO(b, encoding="ascii")
2010 del t
2011 support.gc_collect()
2012 self.assertEqual(record, [1, 2, 3])
2013
2014 def test_error_through_destructor(self):
2015 # Test that the exception state is not modified by a destructor,
2016 # even if close() fails.
2017 rawio = self.CloseFailureIO()
2018 def f():
2019 self.TextIOWrapper(rawio).xyzzy
2020 with support.captured_output("stderr") as s:
2021 self.assertRaises(AttributeError, f)
2022 s = s.getvalue().strip()
2023 if s:
2024 # The destructor *may* have printed an unraisable error, check it
2025 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002026 self.assertTrue(s.startswith("Exception IOError: "), s)
2027 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002028
2029 # Systematic tests of the text I/O API
2030
Antoine Pitrou19690592009-06-12 20:14:08 +00002031 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002032 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2033 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00002034 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002035 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002036 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002037 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002038 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002039 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002040 self.assertEqual(f.tell(), 0)
2041 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002042 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002043 self.assertEqual(f.seek(0), 0)
2044 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002045 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002046 self.assertEqual(f.read(2), "ab")
2047 self.assertEqual(f.read(1), "c")
2048 self.assertEqual(f.read(1), "")
2049 self.assertEqual(f.read(), "")
2050 self.assertEqual(f.tell(), cookie)
2051 self.assertEqual(f.seek(0), 0)
2052 self.assertEqual(f.seek(0, 2), cookie)
2053 self.assertEqual(f.write("def"), 3)
2054 self.assertEqual(f.seek(cookie), cookie)
2055 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002056 if enc.startswith("utf"):
2057 self.multi_line_test(f, enc)
2058 f.close()
2059
2060 def multi_line_test(self, f, enc):
2061 f.seek(0)
2062 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002063 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002064 wlines = []
2065 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2066 chars = []
2067 for i in range(size):
2068 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002069 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002070 wlines.append((f.tell(), line))
2071 f.write(line)
2072 f.seek(0)
2073 rlines = []
2074 while True:
2075 pos = f.tell()
2076 line = f.readline()
2077 if not line:
2078 break
2079 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002080 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002081
Antoine Pitrou19690592009-06-12 20:14:08 +00002082 def test_telling(self):
2083 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002084 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002085 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002086 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002087 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002088 p2 = f.tell()
2089 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002090 self.assertEqual(f.tell(), p0)
2091 self.assertEqual(f.readline(), "\xff\n")
2092 self.assertEqual(f.tell(), p1)
2093 self.assertEqual(f.readline(), "\xff\n")
2094 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002095 f.seek(0)
2096 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002097 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002098 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002099 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002100 f.close()
2101
Antoine Pitrou19690592009-06-12 20:14:08 +00002102 def test_seeking(self):
2103 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002104 prefix_size = chunk_size - 2
2105 u_prefix = "a" * prefix_size
2106 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002107 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002108 u_suffix = "\u8888\n"
2109 suffix = bytes(u_suffix.encode("utf-8"))
2110 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002111 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002112 f.write(line*2)
2113 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002114 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002115 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002116 self.assertEqual(s, prefix.decode("ascii"))
2117 self.assertEqual(f.tell(), prefix_size)
2118 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002119
Antoine Pitrou19690592009-06-12 20:14:08 +00002120 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002121 # Regression test for a specific bug
2122 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002123 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002124 f.write(data)
2125 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002126 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002127 f._CHUNK_SIZE # Just test that it exists
2128 f._CHUNK_SIZE = 2
2129 f.readline()
2130 f.tell()
2131
Antoine Pitrou19690592009-06-12 20:14:08 +00002132 def test_seek_and_tell(self):
2133 #Test seek/tell using the StatefulIncrementalDecoder.
2134 # Make test faster by doing smaller seeks
2135 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002136
Antoine Pitrou19690592009-06-12 20:14:08 +00002137 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002138 """Tell/seek to various points within a data stream and ensure
2139 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002140 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002141 f.write(data)
2142 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002143 f = self.open(support.TESTFN, encoding='test_decoder')
2144 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002145 decoded = f.read()
2146 f.close()
2147
2148 for i in range(min_pos, len(decoded) + 1): # seek positions
2149 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002150 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002151 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002152 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002153 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002154 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002155 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002156 f.close()
2157
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002158 # Enable the test decoder.
2159 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002160
2161 # Run the tests.
2162 try:
2163 # Try each test case.
2164 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002165 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002166
2167 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002168 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2169 offset = CHUNK_SIZE - len(input)//2
2170 prefix = b'.'*offset
2171 # Don't bother seeking into the prefix (takes too long).
2172 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002173 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002174
2175 # Ensure our test decoder won't interfere with subsequent tests.
2176 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002177 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002178
Antoine Pitrou19690592009-06-12 20:14:08 +00002179 def test_encoded_writes(self):
2180 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002181 tests = ("utf-16",
2182 "utf-16-le",
2183 "utf-16-be",
2184 "utf-32",
2185 "utf-32-le",
2186 "utf-32-be")
2187 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002188 buf = self.BytesIO()
2189 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002190 # Check if the BOM is written only once (see issue1753).
2191 f.write(data)
2192 f.write(data)
2193 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002194 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002195 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002196 self.assertEqual(f.read(), data * 2)
2197 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002198
Antoine Pitrou19690592009-06-12 20:14:08 +00002199 def test_unreadable(self):
2200 class UnReadable(self.BytesIO):
2201 def readable(self):
2202 return False
2203 txt = self.TextIOWrapper(UnReadable())
2204 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002205
Antoine Pitrou19690592009-06-12 20:14:08 +00002206 def test_read_one_by_one(self):
2207 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002208 reads = ""
2209 while True:
2210 c = txt.read(1)
2211 if not c:
2212 break
2213 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002214 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002215
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002216 def test_readlines(self):
2217 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2218 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2219 txt.seek(0)
2220 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2221 txt.seek(0)
2222 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2223
Christian Heimes1a6387e2008-03-26 12:49:49 +00002224 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002225 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002226 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002227 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002228 reads = ""
2229 while True:
2230 c = txt.read(128)
2231 if not c:
2232 break
2233 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002234 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002235
Antoine Pitroueadca1d2012-10-16 23:02:27 +02002236 def test_writelines(self):
2237 l = ['ab', 'cd', 'ef']
2238 buf = self.BytesIO()
2239 txt = self.TextIOWrapper(buf)
2240 txt.writelines(l)
2241 txt.flush()
2242 self.assertEqual(buf.getvalue(), b'abcdef')
2243
2244 def test_writelines_userlist(self):
2245 l = UserList(['ab', 'cd', 'ef'])
2246 buf = self.BytesIO()
2247 txt = self.TextIOWrapper(buf)
2248 txt.writelines(l)
2249 txt.flush()
2250 self.assertEqual(buf.getvalue(), b'abcdef')
2251
2252 def test_writelines_error(self):
2253 txt = self.TextIOWrapper(self.BytesIO())
2254 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2255 self.assertRaises(TypeError, txt.writelines, None)
2256 self.assertRaises(TypeError, txt.writelines, b'abc')
2257
Christian Heimes1a6387e2008-03-26 12:49:49 +00002258 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002259 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002260
2261 # read one char at a time
2262 reads = ""
2263 while True:
2264 c = txt.read(1)
2265 if not c:
2266 break
2267 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002268 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002269
2270 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002271 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002272 txt._CHUNK_SIZE = 4
2273
2274 reads = ""
2275 while True:
2276 c = txt.read(4)
2277 if not c:
2278 break
2279 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002280 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002281
2282 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002283 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002284 txt._CHUNK_SIZE = 4
2285
2286 reads = txt.read(4)
2287 reads += txt.read(4)
2288 reads += txt.readline()
2289 reads += txt.readline()
2290 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002291 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002292
2293 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002294 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002295 txt._CHUNK_SIZE = 4
2296
2297 reads = txt.read(4)
2298 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002299 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002300
2301 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002302 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002303 txt._CHUNK_SIZE = 4
2304
2305 reads = txt.read(4)
2306 pos = txt.tell()
2307 txt.seek(0)
2308 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002309 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002310
2311 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002312 buffer = self.BytesIO(self.testdata)
2313 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002314
2315 self.assertEqual(buffer.seekable(), txt.seekable())
2316
Antoine Pitrou19690592009-06-12 20:14:08 +00002317 def test_append_bom(self):
2318 # The BOM is not written again when appending to a non-empty file
2319 filename = support.TESTFN
2320 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2321 with self.open(filename, 'w', encoding=charset) as f:
2322 f.write('aaa')
2323 pos = f.tell()
2324 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002325 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002326
2327 with self.open(filename, 'a', encoding=charset) as f:
2328 f.write('xxx')
2329 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002330 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002331
Antoine Pitrou19690592009-06-12 20:14:08 +00002332 def test_seek_bom(self):
2333 # Same test, but when seeking manually
2334 filename = support.TESTFN
2335 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2336 with self.open(filename, 'w', encoding=charset) as f:
2337 f.write('aaa')
2338 pos = f.tell()
2339 with self.open(filename, 'r+', encoding=charset) as f:
2340 f.seek(pos)
2341 f.write('zzz')
2342 f.seek(0)
2343 f.write('bbb')
2344 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002345 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002346
2347 def test_errors_property(self):
2348 with self.open(support.TESTFN, "w") as f:
2349 self.assertEqual(f.errors, "strict")
2350 with self.open(support.TESTFN, "w", errors="replace") as f:
2351 self.assertEqual(f.errors, "replace")
2352
Victor Stinner6a102812010-04-27 23:55:59 +00002353 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002354 def test_threads_write(self):
2355 # Issue6750: concurrent writes could duplicate data
2356 event = threading.Event()
2357 with self.open(support.TESTFN, "w", buffering=1) as f:
2358 def run(n):
2359 text = "Thread%03d\n" % n
2360 event.wait()
2361 f.write(text)
2362 threads = [threading.Thread(target=lambda n=x: run(n))
2363 for x in range(20)]
2364 for t in threads:
2365 t.start()
2366 time.sleep(0.02)
2367 event.set()
2368 for t in threads:
2369 t.join()
2370 with self.open(support.TESTFN) as f:
2371 content = f.read()
2372 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002373 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002374
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002375 def test_flush_error_on_close(self):
2376 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2377 def bad_flush():
2378 raise IOError()
2379 txt.flush = bad_flush
2380 self.assertRaises(IOError, txt.close) # exception not swallowed
2381
2382 def test_multi_close(self):
2383 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2384 txt.close()
2385 txt.close()
2386 txt.close()
2387 self.assertRaises(ValueError, txt.flush)
2388
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002389 def test_readonly_attributes(self):
2390 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2391 buf = self.BytesIO(self.testdata)
2392 with self.assertRaises((AttributeError, TypeError)):
2393 txt.buffer = buf
2394
Antoine Pitrou19690592009-06-12 20:14:08 +00002395class CTextIOWrapperTest(TextIOWrapperTest):
2396
2397 def test_initialization(self):
2398 r = self.BytesIO(b"\xc3\xa9\n\n")
2399 b = self.BufferedReader(r, 1000)
2400 t = self.TextIOWrapper(b)
2401 self.assertRaises(TypeError, t.__init__, b, newline=42)
2402 self.assertRaises(ValueError, t.read)
2403 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2404 self.assertRaises(ValueError, t.read)
2405
2406 def test_garbage_collection(self):
2407 # C TextIOWrapper objects are collected, and collecting them flushes
2408 # all data to disk.
2409 # The Python version has __del__, so it ends in gc.garbage instead.
2410 rawio = io.FileIO(support.TESTFN, "wb")
2411 b = self.BufferedWriter(rawio)
2412 t = self.TextIOWrapper(b, encoding="ascii")
2413 t.write("456def")
2414 t.x = t
2415 wr = weakref.ref(t)
2416 del t
2417 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002418 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002419 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002420 self.assertEqual(f.read(), b"456def")
2421
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002422 def test_rwpair_cleared_before_textio(self):
2423 # Issue 13070: TextIOWrapper's finalization would crash when called
2424 # after the reference to the underlying BufferedRWPair's writer got
2425 # cleared by the GC.
2426 for i in range(1000):
2427 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2428 t1 = self.TextIOWrapper(b1, encoding="ascii")
2429 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2430 t2 = self.TextIOWrapper(b2, encoding="ascii")
2431 # circular references
2432 t1.buddy = t2
2433 t2.buddy = t1
2434 support.gc_collect()
2435
2436
Antoine Pitrou19690592009-06-12 20:14:08 +00002437class PyTextIOWrapperTest(TextIOWrapperTest):
2438 pass
2439
2440
2441class IncrementalNewlineDecoderTest(unittest.TestCase):
2442
2443 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002444 # UTF-8 specific tests for a newline decoder
2445 def _check_decode(b, s, **kwargs):
2446 # We exercise getstate() / setstate() as well as decode()
2447 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002448 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002449 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002450 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002451
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002452 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002453
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002454 _check_decode(b'\xe8', "")
2455 _check_decode(b'\xa2', "")
2456 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002457
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002458 _check_decode(b'\xe8', "")
2459 _check_decode(b'\xa2', "")
2460 _check_decode(b'\x88', "\u8888")
2461
2462 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002463 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2464
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002465 decoder.reset()
2466 _check_decode(b'\n', "\n")
2467 _check_decode(b'\r', "")
2468 _check_decode(b'', "\n", final=True)
2469 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002470
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002471 _check_decode(b'\r', "")
2472 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002473
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002474 _check_decode(b'\r\r\n', "\n\n")
2475 _check_decode(b'\r', "")
2476 _check_decode(b'\r', "\n")
2477 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002478
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002479 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2480 _check_decode(b'\xe8\xa2\x88', "\u8888")
2481 _check_decode(b'\n', "\n")
2482 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2483 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002484
Antoine Pitrou19690592009-06-12 20:14:08 +00002485 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002486 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002487 if encoding is not None:
2488 encoder = codecs.getincrementalencoder(encoding)()
2489 def _decode_bytewise(s):
2490 # Decode one byte at a time
2491 for b in encoder.encode(s):
2492 result.append(decoder.decode(b))
2493 else:
2494 encoder = None
2495 def _decode_bytewise(s):
2496 # Decode one char at a time
2497 for c in s:
2498 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002499 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002500 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002501 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002502 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002503 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002504 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002505 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002506 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002507 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002508 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002509 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002510 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002511 input = "abc"
2512 if encoder is not None:
2513 encoder.reset()
2514 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002515 self.assertEqual(decoder.decode(input), "abc")
2516 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002517
2518 def test_newline_decoder(self):
2519 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002520 # None meaning the IncrementalNewlineDecoder takes unicode input
2521 # rather than bytes input
2522 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002523 'utf-16', 'utf-16-le', 'utf-16-be',
2524 'utf-32', 'utf-32-le', 'utf-32-be',
2525 )
2526 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002527 decoder = enc and codecs.getincrementaldecoder(enc)()
2528 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2529 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002530 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002531 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2532 self.check_newline_decoding_utf8(decoder)
2533
2534 def test_newline_bytes(self):
2535 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2536 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002537 self.assertEqual(dec.newlines, None)
2538 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2539 self.assertEqual(dec.newlines, None)
2540 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2541 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002542 dec = self.IncrementalNewlineDecoder(None, translate=False)
2543 _check(dec)
2544 dec = self.IncrementalNewlineDecoder(None, translate=True)
2545 _check(dec)
2546
2547class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2548 pass
2549
2550class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2551 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002552
Christian Heimes1a6387e2008-03-26 12:49:49 +00002553
2554# XXX Tests for open()
2555
2556class MiscIOTest(unittest.TestCase):
2557
Benjamin Petersonad100c32008-11-20 22:06:22 +00002558 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002559 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002560
Antoine Pitrou19690592009-06-12 20:14:08 +00002561 def test___all__(self):
2562 for name in self.io.__all__:
2563 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002564 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002565 if name == "open":
2566 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002567 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002568 self.assertTrue(issubclass(obj, Exception), name)
2569 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002570 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002571
Benjamin Petersonad100c32008-11-20 22:06:22 +00002572 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002573 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002574 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002575 f.close()
2576
Antoine Pitrou19690592009-06-12 20:14:08 +00002577 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002578 self.assertEqual(f.name, support.TESTFN)
2579 self.assertEqual(f.buffer.name, support.TESTFN)
2580 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2581 self.assertEqual(f.mode, "U")
2582 self.assertEqual(f.buffer.mode, "rb")
2583 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002584 f.close()
2585
Antoine Pitrou19690592009-06-12 20:14:08 +00002586 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002587 self.assertEqual(f.mode, "w+")
2588 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2589 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002590
Antoine Pitrou19690592009-06-12 20:14:08 +00002591 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002592 self.assertEqual(g.mode, "wb")
2593 self.assertEqual(g.raw.mode, "wb")
2594 self.assertEqual(g.name, f.fileno())
2595 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002596 f.close()
2597 g.close()
2598
Antoine Pitrou19690592009-06-12 20:14:08 +00002599 def test_io_after_close(self):
2600 for kwargs in [
2601 {"mode": "w"},
2602 {"mode": "wb"},
2603 {"mode": "w", "buffering": 1},
2604 {"mode": "w", "buffering": 2},
2605 {"mode": "wb", "buffering": 0},
2606 {"mode": "r"},
2607 {"mode": "rb"},
2608 {"mode": "r", "buffering": 1},
2609 {"mode": "r", "buffering": 2},
2610 {"mode": "rb", "buffering": 0},
2611 {"mode": "w+"},
2612 {"mode": "w+b"},
2613 {"mode": "w+", "buffering": 1},
2614 {"mode": "w+", "buffering": 2},
2615 {"mode": "w+b", "buffering": 0},
2616 ]:
2617 f = self.open(support.TESTFN, **kwargs)
2618 f.close()
2619 self.assertRaises(ValueError, f.flush)
2620 self.assertRaises(ValueError, f.fileno)
2621 self.assertRaises(ValueError, f.isatty)
2622 self.assertRaises(ValueError, f.__iter__)
2623 if hasattr(f, "peek"):
2624 self.assertRaises(ValueError, f.peek, 1)
2625 self.assertRaises(ValueError, f.read)
2626 if hasattr(f, "read1"):
2627 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002628 if hasattr(f, "readall"):
2629 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002630 if hasattr(f, "readinto"):
2631 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2632 self.assertRaises(ValueError, f.readline)
2633 self.assertRaises(ValueError, f.readlines)
2634 self.assertRaises(ValueError, f.seek, 0)
2635 self.assertRaises(ValueError, f.tell)
2636 self.assertRaises(ValueError, f.truncate)
2637 self.assertRaises(ValueError, f.write,
2638 b"" if "b" in kwargs['mode'] else "")
2639 self.assertRaises(ValueError, f.writelines, [])
2640 self.assertRaises(ValueError, next, f)
2641
2642 def test_blockingioerror(self):
2643 # Various BlockingIOError issues
2644 self.assertRaises(TypeError, self.BlockingIOError)
2645 self.assertRaises(TypeError, self.BlockingIOError, 1)
2646 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2647 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2648 b = self.BlockingIOError(1, "")
2649 self.assertEqual(b.characters_written, 0)
2650 class C(unicode):
2651 pass
2652 c = C("")
2653 b = self.BlockingIOError(1, c)
2654 c.b = b
2655 b.c = c
2656 wr = weakref.ref(c)
2657 del c, b
2658 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002659 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002660
2661 def test_abcs(self):
2662 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002663 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2664 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2665 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2666 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002667
2668 def _check_abc_inheritance(self, abcmodule):
2669 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002670 self.assertIsInstance(f, abcmodule.IOBase)
2671 self.assertIsInstance(f, abcmodule.RawIOBase)
2672 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2673 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002674 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002675 self.assertIsInstance(f, abcmodule.IOBase)
2676 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2677 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2678 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002679 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002680 self.assertIsInstance(f, abcmodule.IOBase)
2681 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2682 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2683 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002684
2685 def test_abc_inheritance(self):
2686 # Test implementations inherit from their respective ABCs
2687 self._check_abc_inheritance(self)
2688
2689 def test_abc_inheritance_official(self):
2690 # Test implementations inherit from the official ABCs of the
2691 # baseline "io" module.
2692 self._check_abc_inheritance(io)
2693
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01002694 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2695 def test_nonblock_pipe_write_bigbuf(self):
2696 self._test_nonblock_pipe_write(16*1024)
2697
2698 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2699 def test_nonblock_pipe_write_smallbuf(self):
2700 self._test_nonblock_pipe_write(1024)
2701
2702 def _set_non_blocking(self, fd):
2703 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2704 self.assertNotEqual(flags, -1)
2705 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2706 self.assertEqual(res, 0)
2707
2708 def _test_nonblock_pipe_write(self, bufsize):
2709 sent = []
2710 received = []
2711 r, w = os.pipe()
2712 self._set_non_blocking(r)
2713 self._set_non_blocking(w)
2714
2715 # To exercise all code paths in the C implementation we need
2716 # to play with buffer sizes. For instance, if we choose a
2717 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2718 # then we will never get a partial write of the buffer.
2719 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2720 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2721
2722 with rf, wf:
2723 for N in 9999, 73, 7574:
2724 try:
2725 i = 0
2726 while True:
2727 msg = bytes([i % 26 + 97]) * N
2728 sent.append(msg)
2729 wf.write(msg)
2730 i += 1
2731
2732 except self.BlockingIOError as e:
2733 self.assertEqual(e.args[0], errno.EAGAIN)
2734 sent[-1] = sent[-1][:e.characters_written]
2735 received.append(rf.read())
2736 msg = b'BLOCKED'
2737 wf.write(msg)
2738 sent.append(msg)
2739
2740 while True:
2741 try:
2742 wf.flush()
2743 break
2744 except self.BlockingIOError as e:
2745 self.assertEqual(e.args[0], errno.EAGAIN)
2746 self.assertEqual(e.characters_written, 0)
2747 received.append(rf.read())
2748
2749 received += iter(rf.read, None)
2750
2751 sent, received = b''.join(sent), b''.join(received)
2752 self.assertTrue(sent == received)
2753 self.assertTrue(wf.closed)
2754 self.assertTrue(rf.closed)
2755
Antoine Pitrou19690592009-06-12 20:14:08 +00002756class CMiscIOTest(MiscIOTest):
2757 io = io
2758
2759class PyMiscIOTest(MiscIOTest):
2760 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002761
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002762
2763@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2764class SignalsTest(unittest.TestCase):
2765
2766 def setUp(self):
2767 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2768
2769 def tearDown(self):
2770 signal.signal(signal.SIGALRM, self.oldalrm)
2771
2772 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00002773 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002774
2775 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02002776 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2777 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002778 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2779 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00002780 invokes the signal handler, and bubbles up the exception raised
2781 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002782 read_results = []
2783 def _read():
2784 s = os.read(r, 1)
2785 read_results.append(s)
2786 t = threading.Thread(target=_read)
2787 t.daemon = True
2788 r, w = os.pipe()
2789 try:
2790 wio = self.io.open(w, **fdopen_kwargs)
2791 t.start()
2792 signal.alarm(1)
2793 # Fill the pipe enough that the write will be blocking.
2794 # It will be interrupted by the timer armed above. Since the
2795 # other thread has read one byte, the low-level write will
2796 # return with a successful (partial) result rather than an EINTR.
2797 # The buffered IO layer must check for pending signal
2798 # handlers, which in this case will invoke alarm_interrupt().
2799 self.assertRaises(ZeroDivisionError,
2800 wio.write, item * (1024 * 1024))
2801 t.join()
2802 # We got one byte, get another one and check that it isn't a
2803 # repeat of the first one.
2804 read_results.append(os.read(r, 1))
2805 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2806 finally:
2807 os.close(w)
2808 os.close(r)
2809 # This is deliberate. If we didn't close the file descriptor
2810 # before closing wio, wio would try to flush its internal
2811 # buffer, and block again.
2812 try:
2813 wio.close()
2814 except IOError as e:
2815 if e.errno != errno.EBADF:
2816 raise
2817
2818 def test_interrupted_write_unbuffered(self):
2819 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2820
2821 def test_interrupted_write_buffered(self):
2822 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2823
2824 def test_interrupted_write_text(self):
2825 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2826
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002827 def check_reentrant_write(self, data, **fdopen_kwargs):
2828 def on_alarm(*args):
2829 # Will be called reentrantly from the same thread
2830 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02002831 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002832 signal.signal(signal.SIGALRM, on_alarm)
2833 r, w = os.pipe()
2834 wio = self.io.open(w, **fdopen_kwargs)
2835 try:
2836 signal.alarm(1)
2837 # Either the reentrant call to wio.write() fails with RuntimeError,
2838 # or the signal handler raises ZeroDivisionError.
2839 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2840 while 1:
2841 for i in range(100):
2842 wio.write(data)
2843 wio.flush()
2844 # Make sure the buffer doesn't fill up and block further writes
2845 os.read(r, len(data) * 100)
2846 exc = cm.exception
2847 if isinstance(exc, RuntimeError):
2848 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2849 finally:
2850 wio.close()
2851 os.close(r)
2852
2853 def test_reentrant_write_buffered(self):
2854 self.check_reentrant_write(b"xy", mode="wb")
2855
2856 def test_reentrant_write_text(self):
2857 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2858
Antoine Pitrou6439c002011-02-25 21:35:47 +00002859 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2860 """Check that a buffered read, when it gets interrupted (either
2861 returning a partial result or EINTR), properly invokes the signal
2862 handler and retries if the latter returned successfully."""
2863 r, w = os.pipe()
2864 fdopen_kwargs["closefd"] = False
2865 def alarm_handler(sig, frame):
2866 os.write(w, b"bar")
2867 signal.signal(signal.SIGALRM, alarm_handler)
2868 try:
2869 rio = self.io.open(r, **fdopen_kwargs)
2870 os.write(w, b"foo")
2871 signal.alarm(1)
2872 # Expected behaviour:
2873 # - first raw read() returns partial b"foo"
2874 # - second raw read() returns EINTR
2875 # - third raw read() returns b"bar"
2876 self.assertEqual(decode(rio.read(6)), "foobar")
2877 finally:
2878 rio.close()
2879 os.close(w)
2880 os.close(r)
2881
2882 def test_interrupterd_read_retry_buffered(self):
2883 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2884 mode="rb")
2885
2886 def test_interrupterd_read_retry_text(self):
2887 self.check_interrupted_read_retry(lambda x: x,
2888 mode="r")
2889
2890 @unittest.skipUnless(threading, 'Threading required for this test.')
2891 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2892 """Check that a buffered write, when it gets interrupted (either
2893 returning a partial result or EINTR), properly invokes the signal
2894 handler and retries if the latter returned successfully."""
2895 select = support.import_module("select")
2896 # A quantity that exceeds the buffer size of an anonymous pipe's
2897 # write end.
2898 N = 1024 * 1024
2899 r, w = os.pipe()
2900 fdopen_kwargs["closefd"] = False
2901 # We need a separate thread to read from the pipe and allow the
2902 # write() to finish. This thread is started after the SIGALRM is
2903 # received (forcing a first EINTR in write()).
2904 read_results = []
2905 write_finished = False
2906 def _read():
2907 while not write_finished:
2908 while r in select.select([r], [], [], 1.0)[0]:
2909 s = os.read(r, 1024)
2910 read_results.append(s)
2911 t = threading.Thread(target=_read)
2912 t.daemon = True
2913 def alarm1(sig, frame):
2914 signal.signal(signal.SIGALRM, alarm2)
2915 signal.alarm(1)
2916 def alarm2(sig, frame):
2917 t.start()
2918 signal.signal(signal.SIGALRM, alarm1)
2919 try:
2920 wio = self.io.open(w, **fdopen_kwargs)
2921 signal.alarm(1)
2922 # Expected behaviour:
2923 # - first raw write() is partial (because of the limited pipe buffer
2924 # and the first alarm)
2925 # - second raw write() returns EINTR (because of the second alarm)
2926 # - subsequent write()s are successful (either partial or complete)
2927 self.assertEqual(N, wio.write(item * N))
2928 wio.flush()
2929 write_finished = True
2930 t.join()
2931 self.assertEqual(N, sum(len(x) for x in read_results))
2932 finally:
2933 write_finished = True
2934 os.close(w)
2935 os.close(r)
2936 # This is deliberate. If we didn't close the file descriptor
2937 # before closing wio, wio would try to flush its internal
2938 # buffer, and could block (in case of failure).
2939 try:
2940 wio.close()
2941 except IOError as e:
2942 if e.errno != errno.EBADF:
2943 raise
2944
2945 def test_interrupterd_write_retry_buffered(self):
2946 self.check_interrupted_write_retry(b"x", mode="wb")
2947
2948 def test_interrupterd_write_retry_text(self):
2949 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
2950
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002951
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002952class CSignalsTest(SignalsTest):
2953 io = io
2954
2955class PySignalsTest(SignalsTest):
2956 io = pyio
2957
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002958 # Handling reentrancy issues would slow down _pyio even more, so the
2959 # tests are disabled.
2960 test_reentrant_write_buffered = None
2961 test_reentrant_write_text = None
2962
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002963
Christian Heimes1a6387e2008-03-26 12:49:49 +00002964def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00002965 tests = (CIOTest, PyIOTest,
2966 CBufferedReaderTest, PyBufferedReaderTest,
2967 CBufferedWriterTest, PyBufferedWriterTest,
2968 CBufferedRWPairTest, PyBufferedRWPairTest,
2969 CBufferedRandomTest, PyBufferedRandomTest,
2970 StatefulIncrementalDecoderTest,
2971 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2972 CTextIOWrapperTest, PyTextIOWrapperTest,
2973 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002974 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00002975 )
2976
2977 # Put the namespaces of the IO module we are testing and some useful mock
2978 # classes in the __dict__ of each test.
2979 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00002980 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00002981 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2982 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2983 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2984 globs = globals()
2985 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2986 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2987 # Avoid turning open into a bound method.
2988 py_io_ns["open"] = pyio.OpenWrapper
2989 for test in tests:
2990 if test.__name__.startswith("C"):
2991 for name, obj in c_io_ns.items():
2992 setattr(test, name, obj)
2993 elif test.__name__.startswith("Py"):
2994 for name, obj in py_io_ns.items():
2995 setattr(test, name, obj)
2996
2997 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002998
2999if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00003000 test_main()