blob: e4507e444fc7d387bc6e3228f1db79661bf9f1c6 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Antoine Pitrou19690592009-06-12 20:14:08 +000032import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000033import signal
34import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000035from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000036from collections import deque
37from test import test_support as support
Christian Heimes1a6387e2008-03-26 12:49:49 +000038
39import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000042try:
43 import threading
44except ImportError:
45 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010046try:
47 import fcntl
48except ImportError:
49 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000050
51__metaclass__ = type
52bytes = support.py3k_bytes
53
54def _default_chunk_size():
55 """Get the default TextIOWrapper chunk size"""
56 with io.open(__file__, "r", encoding="latin1") as f:
57 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000058
59
Antoine Pitrou6391b342010-09-14 18:48:19 +000060class MockRawIOWithoutRead:
61 """A RawIO implementation without read(), so as to exercise the default
62 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000063
64 def __init__(self, read_stack=()):
65 self._read_stack = list(read_stack)
66 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000067 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000068 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000069
Christian Heimes1a6387e2008-03-26 12:49:49 +000070 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000071 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000072 return len(b)
73
74 def writable(self):
75 return True
76
77 def fileno(self):
78 return 42
79
80 def readable(self):
81 return True
82
83 def seekable(self):
84 return True
85
86 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000087 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000088
89 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000090 return 0 # same comment as above
91
92 def readinto(self, buf):
93 self._reads += 1
94 max_len = len(buf)
95 try:
96 data = self._read_stack[0]
97 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000098 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +000099 return 0
100 if data is None:
101 del self._read_stack[0]
102 return None
103 n = len(data)
104 if len(data) <= max_len:
105 del self._read_stack[0]
106 buf[:n] = data
107 return n
108 else:
109 buf[:] = data[:max_len]
110 self._read_stack[0] = data[max_len:]
111 return max_len
112
113 def truncate(self, pos=None):
114 return pos
115
Antoine Pitrou6391b342010-09-14 18:48:19 +0000116class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
117 pass
118
119class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
120 pass
121
122
123class MockRawIO(MockRawIOWithoutRead):
124
125 def read(self, n=None):
126 self._reads += 1
127 try:
128 return self._read_stack.pop(0)
129 except:
130 self._extraneous_reads += 1
131 return b""
132
Antoine Pitrou19690592009-06-12 20:14:08 +0000133class CMockRawIO(MockRawIO, io.RawIOBase):
134 pass
135
136class PyMockRawIO(MockRawIO, pyio.RawIOBase):
137 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000138
139
Antoine Pitrou19690592009-06-12 20:14:08 +0000140class MisbehavedRawIO(MockRawIO):
141 def write(self, b):
142 return MockRawIO.write(self, b) * 2
143
144 def read(self, n=None):
145 return MockRawIO.read(self, n) * 2
146
147 def seek(self, pos, whence):
148 return -123
149
150 def tell(self):
151 return -456
152
153 def readinto(self, buf):
154 MockRawIO.readinto(self, buf)
155 return len(buf) * 5
156
157class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
158 pass
159
160class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
161 pass
162
163
164class CloseFailureIO(MockRawIO):
165 closed = 0
166
167 def close(self):
168 if not self.closed:
169 self.closed = 1
170 raise IOError
171
172class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
173 pass
174
175class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
176 pass
177
178
179class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000180
181 def __init__(self, data):
182 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000183 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000184
185 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000187 self.read_history.append(None if res is None else len(res))
188 return res
189
Antoine Pitrou19690592009-06-12 20:14:08 +0000190 def readinto(self, b):
191 res = super(MockFileIO, self).readinto(b)
192 self.read_history.append(res)
193 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000194
Antoine Pitrou19690592009-06-12 20:14:08 +0000195class CMockFileIO(MockFileIO, io.BytesIO):
196 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000197
Antoine Pitrou19690592009-06-12 20:14:08 +0000198class PyMockFileIO(MockFileIO, pyio.BytesIO):
199 pass
200
201
202class MockNonBlockWriterIO:
203
204 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000205 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000206 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000207
Antoine Pitrou19690592009-06-12 20:14:08 +0000208 def pop_written(self):
209 s = b"".join(self._write_stack)
210 self._write_stack[:] = []
211 return s
212
213 def block_on(self, char):
214 """Block when a given char is encountered."""
215 self._blocker_char = char
216
217 def readable(self):
218 return True
219
220 def seekable(self):
221 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000222
223 def writable(self):
224 return True
225
Antoine Pitrou19690592009-06-12 20:14:08 +0000226 def write(self, b):
227 b = bytes(b)
228 n = -1
229 if self._blocker_char:
230 try:
231 n = b.index(self._blocker_char)
232 except ValueError:
233 pass
234 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100235 if n > 0:
236 # write data up to the first blocker
237 self._write_stack.append(b[:n])
238 return n
239 else:
240 # cancel blocker and indicate would block
241 self._blocker_char = None
242 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000243 self._write_stack.append(b)
244 return len(b)
245
246class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
247 BlockingIOError = io.BlockingIOError
248
249class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
250 BlockingIOError = pyio.BlockingIOError
251
Christian Heimes1a6387e2008-03-26 12:49:49 +0000252
253class IOTest(unittest.TestCase):
254
Antoine Pitrou19690592009-06-12 20:14:08 +0000255 def setUp(self):
256 support.unlink(support.TESTFN)
257
Christian Heimes1a6387e2008-03-26 12:49:49 +0000258 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000259 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000260
261 def write_ops(self, f):
262 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000263 f.truncate(0)
264 self.assertEqual(f.tell(), 5)
265 f.seek(0)
266
267 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000268 self.assertEqual(f.seek(0), 0)
269 self.assertEqual(f.write(b"Hello."), 6)
270 self.assertEqual(f.tell(), 6)
271 self.assertEqual(f.seek(-1, 1), 5)
272 self.assertEqual(f.tell(), 5)
273 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
274 self.assertEqual(f.seek(0), 0)
275 self.assertEqual(f.write(b"h"), 1)
276 self.assertEqual(f.seek(-1, 2), 13)
277 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000278
Christian Heimes1a6387e2008-03-26 12:49:49 +0000279 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000280 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000281 self.assertRaises(TypeError, f.seek, 0.0)
282
283 def read_ops(self, f, buffered=False):
284 data = f.read(5)
285 self.assertEqual(data, b"hello")
286 data = bytearray(data)
287 self.assertEqual(f.readinto(data), 5)
288 self.assertEqual(data, b" worl")
289 self.assertEqual(f.readinto(data), 2)
290 self.assertEqual(len(data), 5)
291 self.assertEqual(data[:2], b"d\n")
292 self.assertEqual(f.seek(0), 0)
293 self.assertEqual(f.read(20), b"hello world\n")
294 self.assertEqual(f.read(1), b"")
295 self.assertEqual(f.readinto(bytearray(b"x")), 0)
296 self.assertEqual(f.seek(-6, 2), 6)
297 self.assertEqual(f.read(5), b"world")
298 self.assertEqual(f.read(0), b"")
299 self.assertEqual(f.readinto(bytearray()), 0)
300 self.assertEqual(f.seek(-6, 1), 5)
301 self.assertEqual(f.read(5), b" worl")
302 self.assertEqual(f.tell(), 10)
303 self.assertRaises(TypeError, f.seek, 0.0)
304 if buffered:
305 f.seek(0)
306 self.assertEqual(f.read(), b"hello world\n")
307 f.seek(6)
308 self.assertEqual(f.read(), b"world\n")
309 self.assertEqual(f.read(), b"")
310
311 LARGE = 2**31
312
313 def large_file_ops(self, f):
314 assert f.readable()
315 assert f.writable()
316 self.assertEqual(f.seek(self.LARGE), self.LARGE)
317 self.assertEqual(f.tell(), self.LARGE)
318 self.assertEqual(f.write(b"xxx"), 3)
319 self.assertEqual(f.tell(), self.LARGE + 3)
320 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
321 self.assertEqual(f.truncate(), self.LARGE + 2)
322 self.assertEqual(f.tell(), self.LARGE + 2)
323 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
324 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000325 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000326 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
327 self.assertEqual(f.seek(-1, 2), self.LARGE)
328 self.assertEqual(f.read(2), b"x")
329
Antoine Pitrou19690592009-06-12 20:14:08 +0000330 def test_invalid_operations(self):
331 # Try writing on a file opened in read mode and vice-versa.
332 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000333 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000334 self.assertRaises(IOError, fp.read)
335 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000336 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000337 self.assertRaises(IOError, fp.write, b"blah")
338 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000339 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000340 self.assertRaises(IOError, fp.write, "blah")
341 self.assertRaises(IOError, fp.writelines, ["blah\n"])
342
Christian Heimes1a6387e2008-03-26 12:49:49 +0000343 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000344 with self.open(support.TESTFN, "wb", buffering=0) as f:
345 self.assertEqual(f.readable(), False)
346 self.assertEqual(f.writable(), True)
347 self.assertEqual(f.seekable(), True)
348 self.write_ops(f)
349 with self.open(support.TESTFN, "rb", buffering=0) as f:
350 self.assertEqual(f.readable(), True)
351 self.assertEqual(f.writable(), False)
352 self.assertEqual(f.seekable(), True)
353 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000354
355 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000356 with self.open(support.TESTFN, "wb") as f:
357 self.assertEqual(f.readable(), False)
358 self.assertEqual(f.writable(), True)
359 self.assertEqual(f.seekable(), True)
360 self.write_ops(f)
361 with self.open(support.TESTFN, "rb") as f:
362 self.assertEqual(f.readable(), True)
363 self.assertEqual(f.writable(), False)
364 self.assertEqual(f.seekable(), True)
365 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000366
367 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000368 with self.open(support.TESTFN, "wb") as f:
369 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
370 with self.open(support.TESTFN, "rb") as f:
371 self.assertEqual(f.readline(), b"abc\n")
372 self.assertEqual(f.readline(10), b"def\n")
373 self.assertEqual(f.readline(2), b"xy")
374 self.assertEqual(f.readline(4), b"zzy\n")
375 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000376 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000377 self.assertRaises(TypeError, f.readline, 5.3)
378 with self.open(support.TESTFN, "r") as f:
379 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000380
381 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000382 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000383 self.write_ops(f)
384 data = f.getvalue()
385 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000386 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000387 self.read_ops(f, True)
388
389 def test_large_file_ops(self):
390 # On Windows and Mac OSX this test comsumes large resources; It takes
391 # a long time to build the >2GB file and takes >2GB of disk space
392 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000393 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
394 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000395 print("\nTesting large file ops skipped on %s." % sys.platform,
396 file=sys.stderr)
397 print("It requires %d bytes and a long time." % self.LARGE,
398 file=sys.stderr)
399 print("Use 'regrtest.py -u largefile test_io' to run it.",
400 file=sys.stderr)
401 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000402 with self.open(support.TESTFN, "w+b", 0) as f:
403 self.large_file_ops(f)
404 with self.open(support.TESTFN, "w+b") as f:
405 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000406
407 def test_with_open(self):
408 for bufsize in (0, 1, 100):
409 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000410 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000411 f.write(b"xxx")
412 self.assertEqual(f.closed, True)
413 f = None
414 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000415 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000416 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000417 except ZeroDivisionError:
418 self.assertEqual(f.closed, True)
419 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000420 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000421
Antoine Pitroue741cc62009-01-21 00:45:36 +0000422 # issue 5008
423 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000424 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000425 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000427 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000428 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000429 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000430 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000431 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000432
Christian Heimes1a6387e2008-03-26 12:49:49 +0000433 def test_destructor(self):
434 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000435 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000436 def __del__(self):
437 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000438 try:
439 f = super(MyFileIO, self).__del__
440 except AttributeError:
441 pass
442 else:
443 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000444 def close(self):
445 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000446 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000447 def flush(self):
448 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000449 super(MyFileIO, self).flush()
450 f = MyFileIO(support.TESTFN, "wb")
451 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000452 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000453 support.gc_collect()
454 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000455 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000456 self.assertEqual(f.read(), b"xxx")
457
458 def _check_base_destructor(self, base):
459 record = []
460 class MyIO(base):
461 def __init__(self):
462 # This exercises the availability of attributes on object
463 # destruction.
464 # (in the C version, close() is called by the tp_dealloc
465 # function, not by __del__)
466 self.on_del = 1
467 self.on_close = 2
468 self.on_flush = 3
469 def __del__(self):
470 record.append(self.on_del)
471 try:
472 f = super(MyIO, self).__del__
473 except AttributeError:
474 pass
475 else:
476 f()
477 def close(self):
478 record.append(self.on_close)
479 super(MyIO, self).close()
480 def flush(self):
481 record.append(self.on_flush)
482 super(MyIO, self).flush()
483 f = MyIO()
484 del f
485 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000486 self.assertEqual(record, [1, 2, 3])
487
Antoine Pitrou19690592009-06-12 20:14:08 +0000488 def test_IOBase_destructor(self):
489 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000490
Antoine Pitrou19690592009-06-12 20:14:08 +0000491 def test_RawIOBase_destructor(self):
492 self._check_base_destructor(self.RawIOBase)
493
494 def test_BufferedIOBase_destructor(self):
495 self._check_base_destructor(self.BufferedIOBase)
496
497 def test_TextIOBase_destructor(self):
498 self._check_base_destructor(self.TextIOBase)
499
500 def test_close_flushes(self):
501 with self.open(support.TESTFN, "wb") as f:
502 f.write(b"xxx")
503 with self.open(support.TESTFN, "rb") as f:
504 self.assertEqual(f.read(), b"xxx")
505
506 def test_array_writes(self):
507 a = array.array(b'i', range(10))
508 n = len(a.tostring())
509 with self.open(support.TESTFN, "wb", 0) as f:
510 self.assertEqual(f.write(a), n)
511 with self.open(support.TESTFN, "wb") as f:
512 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000513
514 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000515 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000516 closefd=False)
517
Antoine Pitrou19690592009-06-12 20:14:08 +0000518 def test_read_closed(self):
519 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000520 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000521 with self.open(support.TESTFN, "r") as f:
522 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000523 self.assertEqual(file.read(), "egg\n")
524 file.seek(0)
525 file.close()
526 self.assertRaises(ValueError, file.read)
527
528 def test_no_closefd_with_filename(self):
529 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000530 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000531
532 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000533 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000534 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000535 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000536 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000537 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000538 self.assertEqual(file.buffer.raw.closefd, False)
539
Antoine Pitrou19690592009-06-12 20:14:08 +0000540 def test_garbage_collection(self):
541 # FileIO objects are collected, and collecting them flushes
542 # all data to disk.
543 f = self.FileIO(support.TESTFN, "wb")
544 f.write(b"abcxxx")
545 f.f = f
546 wr = weakref.ref(f)
547 del f
548 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000549 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000550 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000551 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000552
Antoine Pitrou19690592009-06-12 20:14:08 +0000553 def test_unbounded_file(self):
554 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
555 zero = "/dev/zero"
556 if not os.path.exists(zero):
557 self.skipTest("{0} does not exist".format(zero))
558 if sys.maxsize > 0x7FFFFFFF:
559 self.skipTest("test can only run in a 32-bit address space")
560 if support.real_max_memuse < support._2G:
561 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000562 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000563 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000564 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000565 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000566 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000567 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000568
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000569 def test_flush_error_on_close(self):
570 f = self.open(support.TESTFN, "wb", buffering=0)
571 def bad_flush():
572 raise IOError()
573 f.flush = bad_flush
574 self.assertRaises(IOError, f.close) # exception not swallowed
575
576 def test_multi_close(self):
577 f = self.open(support.TESTFN, "wb", buffering=0)
578 f.close()
579 f.close()
580 f.close()
581 self.assertRaises(ValueError, f.flush)
582
Antoine Pitrou6391b342010-09-14 18:48:19 +0000583 def test_RawIOBase_read(self):
584 # Exercise the default RawIOBase.read() implementation (which calls
585 # readinto() internally).
586 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
587 self.assertEqual(rawio.read(2), b"ab")
588 self.assertEqual(rawio.read(2), b"c")
589 self.assertEqual(rawio.read(2), b"d")
590 self.assertEqual(rawio.read(2), None)
591 self.assertEqual(rawio.read(2), b"ef")
592 self.assertEqual(rawio.read(2), b"g")
593 self.assertEqual(rawio.read(2), None)
594 self.assertEqual(rawio.read(2), b"")
595
Hynek Schlawack877effc2012-05-25 09:24:18 +0200596 def test_fileio_closefd(self):
597 # Issue #4841
598 with self.open(__file__, 'rb') as f1, \
599 self.open(__file__, 'rb') as f2:
600 fileio = self.FileIO(f1.fileno(), closefd=False)
601 # .__init__() must not close f1
602 fileio.__init__(f2.fileno(), closefd=False)
603 f1.readline()
604 # .close() must not close f2
605 fileio.close()
606 f2.readline()
607
608
Antoine Pitrou19690592009-06-12 20:14:08 +0000609class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200610
611 def test_IOBase_finalize(self):
612 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
613 # class which inherits IOBase and an object of this class are caught
614 # in a reference cycle and close() is already in the method cache.
615 class MyIO(self.IOBase):
616 def close(self):
617 pass
618
619 # create an instance to populate the method cache
620 MyIO()
621 obj = MyIO()
622 obj.obj = obj
623 wr = weakref.ref(obj)
624 del MyIO
625 del obj
626 support.gc_collect()
627 self.assertTrue(wr() is None, wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000628
Antoine Pitrou19690592009-06-12 20:14:08 +0000629class PyIOTest(IOTest):
630 test_array_writes = unittest.skip(
631 "len(array.array) returns number of elements rather than bytelength"
632 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000633
634
Antoine Pitrou19690592009-06-12 20:14:08 +0000635class CommonBufferedTests:
636 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
637
638 def test_detach(self):
639 raw = self.MockRawIO()
640 buf = self.tp(raw)
641 self.assertIs(buf.detach(), raw)
642 self.assertRaises(ValueError, buf.detach)
643
644 def test_fileno(self):
645 rawio = self.MockRawIO()
646 bufio = self.tp(rawio)
647
Ezio Melotti2623a372010-11-21 13:34:58 +0000648 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000649
650 def test_no_fileno(self):
651 # XXX will we always have fileno() function? If so, kill
652 # this test. Else, write it.
653 pass
654
655 def test_invalid_args(self):
656 rawio = self.MockRawIO()
657 bufio = self.tp(rawio)
658 # Invalid whence
659 self.assertRaises(ValueError, bufio.seek, 0, -1)
660 self.assertRaises(ValueError, bufio.seek, 0, 3)
661
662 def test_override_destructor(self):
663 tp = self.tp
664 record = []
665 class MyBufferedIO(tp):
666 def __del__(self):
667 record.append(1)
668 try:
669 f = super(MyBufferedIO, self).__del__
670 except AttributeError:
671 pass
672 else:
673 f()
674 def close(self):
675 record.append(2)
676 super(MyBufferedIO, self).close()
677 def flush(self):
678 record.append(3)
679 super(MyBufferedIO, self).flush()
680 rawio = self.MockRawIO()
681 bufio = MyBufferedIO(rawio)
682 writable = bufio.writable()
683 del bufio
684 support.gc_collect()
685 if writable:
686 self.assertEqual(record, [1, 2, 3])
687 else:
688 self.assertEqual(record, [1, 2])
689
690 def test_context_manager(self):
691 # Test usability as a context manager
692 rawio = self.MockRawIO()
693 bufio = self.tp(rawio)
694 def _with():
695 with bufio:
696 pass
697 _with()
698 # bufio should now be closed, and using it a second time should raise
699 # a ValueError.
700 self.assertRaises(ValueError, _with)
701
702 def test_error_through_destructor(self):
703 # Test that the exception state is not modified by a destructor,
704 # even if close() fails.
705 rawio = self.CloseFailureIO()
706 def f():
707 self.tp(rawio).xyzzy
708 with support.captured_output("stderr") as s:
709 self.assertRaises(AttributeError, f)
710 s = s.getvalue().strip()
711 if s:
712 # The destructor *may* have printed an unraisable error, check it
713 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000714 self.assertTrue(s.startswith("Exception IOError: "), s)
715 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000716
717 def test_repr(self):
718 raw = self.MockRawIO()
719 b = self.tp(raw)
720 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
721 self.assertEqual(repr(b), "<%s>" % clsname)
722 raw.name = "dummy"
723 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
724 raw.name = b"dummy"
725 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000726
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000727 def test_flush_error_on_close(self):
728 raw = self.MockRawIO()
729 def bad_flush():
730 raise IOError()
731 raw.flush = bad_flush
732 b = self.tp(raw)
733 self.assertRaises(IOError, b.close) # exception not swallowed
734
735 def test_multi_close(self):
736 raw = self.MockRawIO()
737 b = self.tp(raw)
738 b.close()
739 b.close()
740 b.close()
741 self.assertRaises(ValueError, b.flush)
742
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000743 def test_readonly_attributes(self):
744 raw = self.MockRawIO()
745 buf = self.tp(raw)
746 x = self.MockRawIO()
747 with self.assertRaises((AttributeError, TypeError)):
748 buf.raw = x
749
Christian Heimes1a6387e2008-03-26 12:49:49 +0000750
Antoine Pitroubff5df02012-07-29 19:02:46 +0200751class SizeofTest:
752
753 @support.cpython_only
754 def test_sizeof(self):
755 bufsize1 = 4096
756 bufsize2 = 8192
757 rawio = self.MockRawIO()
758 bufio = self.tp(rawio, buffer_size=bufsize1)
759 size = sys.getsizeof(bufio) - bufsize1
760 rawio = self.MockRawIO()
761 bufio = self.tp(rawio, buffer_size=bufsize2)
762 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
763
764
Antoine Pitrou19690592009-06-12 20:14:08 +0000765class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
766 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000767
Antoine Pitrou19690592009-06-12 20:14:08 +0000768 def test_constructor(self):
769 rawio = self.MockRawIO([b"abc"])
770 bufio = self.tp(rawio)
771 bufio.__init__(rawio)
772 bufio.__init__(rawio, buffer_size=1024)
773 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000774 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000775 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
776 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
777 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
778 rawio = self.MockRawIO([b"abc"])
779 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000780 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000781
Antoine Pitrou19690592009-06-12 20:14:08 +0000782 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000783 for arg in (None, 7):
784 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
785 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000786 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000787 # Invalid args
788 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000789
Antoine Pitrou19690592009-06-12 20:14:08 +0000790 def test_read1(self):
791 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
792 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000793 self.assertEqual(b"a", bufio.read(1))
794 self.assertEqual(b"b", bufio.read1(1))
795 self.assertEqual(rawio._reads, 1)
796 self.assertEqual(b"c", bufio.read1(100))
797 self.assertEqual(rawio._reads, 1)
798 self.assertEqual(b"d", bufio.read1(100))
799 self.assertEqual(rawio._reads, 2)
800 self.assertEqual(b"efg", bufio.read1(100))
801 self.assertEqual(rawio._reads, 3)
802 self.assertEqual(b"", bufio.read1(100))
803 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000804 # Invalid args
805 self.assertRaises(ValueError, bufio.read1, -1)
806
807 def test_readinto(self):
808 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
809 bufio = self.tp(rawio)
810 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000811 self.assertEqual(bufio.readinto(b), 2)
812 self.assertEqual(b, b"ab")
813 self.assertEqual(bufio.readinto(b), 2)
814 self.assertEqual(b, b"cd")
815 self.assertEqual(bufio.readinto(b), 2)
816 self.assertEqual(b, b"ef")
817 self.assertEqual(bufio.readinto(b), 1)
818 self.assertEqual(b, b"gf")
819 self.assertEqual(bufio.readinto(b), 0)
820 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000821
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000822 def test_readlines(self):
823 def bufio():
824 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
825 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000826 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
827 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
828 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000829
Antoine Pitrou19690592009-06-12 20:14:08 +0000830 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000831 data = b"abcdefghi"
832 dlen = len(data)
833
834 tests = [
835 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
836 [ 100, [ 3, 3, 3], [ dlen ] ],
837 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
838 ]
839
840 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000841 rawio = self.MockFileIO(data)
842 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000843 pos = 0
844 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000845 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000846 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000847 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000848 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000849
Antoine Pitrou19690592009-06-12 20:14:08 +0000850 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000851 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000852 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
853 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000854 self.assertEqual(b"abcd", bufio.read(6))
855 self.assertEqual(b"e", bufio.read(1))
856 self.assertEqual(b"fg", bufio.read())
857 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200858 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000859 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000860
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200861 rawio = self.MockRawIO((b"a", None, None))
862 self.assertEqual(b"a", rawio.readall())
863 self.assertIsNone(rawio.readall())
864
Antoine Pitrou19690592009-06-12 20:14:08 +0000865 def test_read_past_eof(self):
866 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
867 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000868
Ezio Melotti2623a372010-11-21 13:34:58 +0000869 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000870
Antoine Pitrou19690592009-06-12 20:14:08 +0000871 def test_read_all(self):
872 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
873 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000874
Ezio Melotti2623a372010-11-21 13:34:58 +0000875 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000876
Victor Stinner6a102812010-04-27 23:55:59 +0000877 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000878 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000879 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000880 try:
881 # Write out many bytes with exactly the same number of 0's,
882 # 1's... 255's. This will help us check that concurrent reading
883 # doesn't duplicate or forget contents.
884 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000885 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000886 random.shuffle(l)
887 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000888 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000889 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000890 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000891 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000892 errors = []
893 results = []
894 def f():
895 try:
896 # Intra-buffer read then buffer-flushing read
897 for n in cycle([1, 19]):
898 s = bufio.read(n)
899 if not s:
900 break
901 # list.append() is atomic
902 results.append(s)
903 except Exception as e:
904 errors.append(e)
905 raise
906 threads = [threading.Thread(target=f) for x in range(20)]
907 for t in threads:
908 t.start()
909 time.sleep(0.02) # yield
910 for t in threads:
911 t.join()
912 self.assertFalse(errors,
913 "the following exceptions were caught: %r" % errors)
914 s = b''.join(results)
915 for i in range(256):
916 c = bytes(bytearray([i]))
917 self.assertEqual(s.count(c), N)
918 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000919 support.unlink(support.TESTFN)
920
921 def test_misbehaved_io(self):
922 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
923 bufio = self.tp(rawio)
924 self.assertRaises(IOError, bufio.seek, 0)
925 self.assertRaises(IOError, bufio.tell)
926
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000927 def test_no_extraneous_read(self):
928 # Issue #9550; when the raw IO object has satisfied the read request,
929 # we should not issue any additional reads, otherwise it may block
930 # (e.g. socket).
931 bufsize = 16
932 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
933 rawio = self.MockRawIO([b"x" * n])
934 bufio = self.tp(rawio, bufsize)
935 self.assertEqual(bufio.read(n), b"x" * n)
936 # Simple case: one raw read is enough to satisfy the request.
937 self.assertEqual(rawio._extraneous_reads, 0,
938 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
939 # A more complex case where two raw reads are needed to satisfy
940 # the request.
941 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
942 bufio = self.tp(rawio, bufsize)
943 self.assertEqual(bufio.read(n), b"x" * n)
944 self.assertEqual(rawio._extraneous_reads, 0,
945 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
946
947
Antoine Pitroubff5df02012-07-29 19:02:46 +0200948class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +0000949 tp = io.BufferedReader
950
951 def test_constructor(self):
952 BufferedReaderTest.test_constructor(self)
953 # The allocation can succeed on 32-bit builds, e.g. with more
954 # than 2GB RAM and a 64-bit kernel.
955 if sys.maxsize > 0x7FFFFFFF:
956 rawio = self.MockRawIO()
957 bufio = self.tp(rawio)
958 self.assertRaises((OverflowError, MemoryError, ValueError),
959 bufio.__init__, rawio, sys.maxsize)
960
961 def test_initialization(self):
962 rawio = self.MockRawIO([b"abc"])
963 bufio = self.tp(rawio)
964 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
965 self.assertRaises(ValueError, bufio.read)
966 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
967 self.assertRaises(ValueError, bufio.read)
968 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
969 self.assertRaises(ValueError, bufio.read)
970
971 def test_misbehaved_io_read(self):
972 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
973 bufio = self.tp(rawio)
974 # _pyio.BufferedReader seems to implement reading different, so that
975 # checking this is not so easy.
976 self.assertRaises(IOError, bufio.read, 10)
977
978 def test_garbage_collection(self):
979 # C BufferedReader objects are collected.
980 # The Python version has __del__, so it ends into gc.garbage instead
981 rawio = self.FileIO(support.TESTFN, "w+b")
982 f = self.tp(rawio)
983 f.f = f
984 wr = weakref.ref(f)
985 del f
986 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000987 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +0000988
989class PyBufferedReaderTest(BufferedReaderTest):
990 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000991
992
Antoine Pitrou19690592009-06-12 20:14:08 +0000993class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
994 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000995
Antoine Pitrou19690592009-06-12 20:14:08 +0000996 def test_constructor(self):
997 rawio = self.MockRawIO()
998 bufio = self.tp(rawio)
999 bufio.__init__(rawio)
1000 bufio.__init__(rawio, buffer_size=1024)
1001 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +00001002 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001003 bufio.flush()
1004 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1005 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1006 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1007 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +00001008 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001009 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001010 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001011
Antoine Pitrou19690592009-06-12 20:14:08 +00001012 def test_detach_flush(self):
1013 raw = self.MockRawIO()
1014 buf = self.tp(raw)
1015 buf.write(b"howdy!")
1016 self.assertFalse(raw._write_stack)
1017 buf.detach()
1018 self.assertEqual(raw._write_stack, [b"howdy!"])
1019
1020 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001021 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +00001022 writer = self.MockRawIO()
1023 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001024 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001025 self.assertFalse(writer._write_stack)
1026
Antoine Pitrou19690592009-06-12 20:14:08 +00001027 def test_write_overflow(self):
1028 writer = self.MockRawIO()
1029 bufio = self.tp(writer, 8)
1030 contents = b"abcdefghijklmnop"
1031 for n in range(0, len(contents), 3):
1032 bufio.write(contents[n:n+3])
1033 flushed = b"".join(writer._write_stack)
1034 # At least (total - 8) bytes were implicitly flushed, perhaps more
1035 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001036 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001037
Antoine Pitrou19690592009-06-12 20:14:08 +00001038 def check_writes(self, intermediate_func):
1039 # Lots of writes, test the flushed output is as expected.
1040 contents = bytes(range(256)) * 1000
1041 n = 0
1042 writer = self.MockRawIO()
1043 bufio = self.tp(writer, 13)
1044 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1045 def gen_sizes():
1046 for size in count(1):
1047 for i in range(15):
1048 yield size
1049 sizes = gen_sizes()
1050 while n < len(contents):
1051 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001052 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001053 intermediate_func(bufio)
1054 n += size
1055 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001056 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001057 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001058
Antoine Pitrou19690592009-06-12 20:14:08 +00001059 def test_writes(self):
1060 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001061
Antoine Pitrou19690592009-06-12 20:14:08 +00001062 def test_writes_and_flushes(self):
1063 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001064
Antoine Pitrou19690592009-06-12 20:14:08 +00001065 def test_writes_and_seeks(self):
1066 def _seekabs(bufio):
1067 pos = bufio.tell()
1068 bufio.seek(pos + 1, 0)
1069 bufio.seek(pos - 1, 0)
1070 bufio.seek(pos, 0)
1071 self.check_writes(_seekabs)
1072 def _seekrel(bufio):
1073 pos = bufio.seek(0, 1)
1074 bufio.seek(+1, 1)
1075 bufio.seek(-1, 1)
1076 bufio.seek(pos, 0)
1077 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001078
Antoine Pitrou19690592009-06-12 20:14:08 +00001079 def test_writes_and_truncates(self):
1080 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001081
Antoine Pitrou19690592009-06-12 20:14:08 +00001082 def test_write_non_blocking(self):
1083 raw = self.MockNonBlockWriterIO()
1084 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001085
Ezio Melotti2623a372010-11-21 13:34:58 +00001086 self.assertEqual(bufio.write(b"abcd"), 4)
1087 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001088 # 1 byte will be written, the rest will be buffered
1089 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001090 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001091
Antoine Pitrou19690592009-06-12 20:14:08 +00001092 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1093 raw.block_on(b"0")
1094 try:
1095 bufio.write(b"opqrwxyz0123456789")
1096 except self.BlockingIOError as e:
1097 written = e.characters_written
1098 else:
1099 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001100 self.assertEqual(written, 16)
1101 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001102 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001103
Ezio Melotti2623a372010-11-21 13:34:58 +00001104 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001105 s = raw.pop_written()
1106 # Previously buffered bytes were flushed
1107 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001108
Antoine Pitrou19690592009-06-12 20:14:08 +00001109 def test_write_and_rewind(self):
1110 raw = io.BytesIO()
1111 bufio = self.tp(raw, 4)
1112 self.assertEqual(bufio.write(b"abcdef"), 6)
1113 self.assertEqual(bufio.tell(), 6)
1114 bufio.seek(0, 0)
1115 self.assertEqual(bufio.write(b"XY"), 2)
1116 bufio.seek(6, 0)
1117 self.assertEqual(raw.getvalue(), b"XYcdef")
1118 self.assertEqual(bufio.write(b"123456"), 6)
1119 bufio.flush()
1120 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001121
Antoine Pitrou19690592009-06-12 20:14:08 +00001122 def test_flush(self):
1123 writer = self.MockRawIO()
1124 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001125 bufio.write(b"abc")
1126 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001127 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001128
Antoine Pitrou19690592009-06-12 20:14:08 +00001129 def test_destructor(self):
1130 writer = self.MockRawIO()
1131 bufio = self.tp(writer, 8)
1132 bufio.write(b"abc")
1133 del bufio
1134 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001135 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001136
1137 def test_truncate(self):
1138 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001139 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001140 bufio = self.tp(raw, 8)
1141 bufio.write(b"abcdef")
1142 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001143 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001144 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001145 self.assertEqual(f.read(), b"abc")
1146
Victor Stinner6a102812010-04-27 23:55:59 +00001147 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001148 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001149 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001150 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001151 # Write out many bytes from many threads and test they were
1152 # all flushed.
1153 N = 1000
1154 contents = bytes(range(256)) * N
1155 sizes = cycle([1, 19])
1156 n = 0
1157 queue = deque()
1158 while n < len(contents):
1159 size = next(sizes)
1160 queue.append(contents[n:n+size])
1161 n += size
1162 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001163 # We use a real file object because it allows us to
1164 # exercise situations where the GIL is released before
1165 # writing the buffer to the raw streams. This is in addition
1166 # to concurrency issues due to switching threads in the middle
1167 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001168 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001169 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001170 errors = []
1171 def f():
1172 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001173 while True:
1174 try:
1175 s = queue.popleft()
1176 except IndexError:
1177 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001178 bufio.write(s)
1179 except Exception as e:
1180 errors.append(e)
1181 raise
1182 threads = [threading.Thread(target=f) for x in range(20)]
1183 for t in threads:
1184 t.start()
1185 time.sleep(0.02) # yield
1186 for t in threads:
1187 t.join()
1188 self.assertFalse(errors,
1189 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001190 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001191 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001192 s = f.read()
1193 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001194 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001195 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001196 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001197
Antoine Pitrou19690592009-06-12 20:14:08 +00001198 def test_misbehaved_io(self):
1199 rawio = self.MisbehavedRawIO()
1200 bufio = self.tp(rawio, 5)
1201 self.assertRaises(IOError, bufio.seek, 0)
1202 self.assertRaises(IOError, bufio.tell)
1203 self.assertRaises(IOError, bufio.write, b"abcdef")
1204
1205 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001206 with support.check_warnings(("max_buffer_size is deprecated",
1207 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001208 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001209
1210
Antoine Pitroubff5df02012-07-29 19:02:46 +02001211class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001212 tp = io.BufferedWriter
1213
1214 def test_constructor(self):
1215 BufferedWriterTest.test_constructor(self)
1216 # The allocation can succeed on 32-bit builds, e.g. with more
1217 # than 2GB RAM and a 64-bit kernel.
1218 if sys.maxsize > 0x7FFFFFFF:
1219 rawio = self.MockRawIO()
1220 bufio = self.tp(rawio)
1221 self.assertRaises((OverflowError, MemoryError, ValueError),
1222 bufio.__init__, rawio, sys.maxsize)
1223
1224 def test_initialization(self):
1225 rawio = self.MockRawIO()
1226 bufio = self.tp(rawio)
1227 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1228 self.assertRaises(ValueError, bufio.write, b"def")
1229 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1230 self.assertRaises(ValueError, bufio.write, b"def")
1231 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1232 self.assertRaises(ValueError, bufio.write, b"def")
1233
1234 def test_garbage_collection(self):
1235 # C BufferedWriter objects are collected, and collecting them flushes
1236 # all data to disk.
1237 # The Python version has __del__, so it ends into gc.garbage instead
1238 rawio = self.FileIO(support.TESTFN, "w+b")
1239 f = self.tp(rawio)
1240 f.write(b"123xxx")
1241 f.x = f
1242 wr = weakref.ref(f)
1243 del f
1244 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001245 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001246 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001247 self.assertEqual(f.read(), b"123xxx")
1248
1249
1250class PyBufferedWriterTest(BufferedWriterTest):
1251 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001252
1253class BufferedRWPairTest(unittest.TestCase):
1254
Antoine Pitrou19690592009-06-12 20:14:08 +00001255 def test_constructor(self):
1256 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001257 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001258
Antoine Pitrou19690592009-06-12 20:14:08 +00001259 def test_detach(self):
1260 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1261 self.assertRaises(self.UnsupportedOperation, pair.detach)
1262
1263 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001264 with support.check_warnings(("max_buffer_size is deprecated",
1265 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001266 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001267
1268 def test_constructor_with_not_readable(self):
1269 class NotReadable(MockRawIO):
1270 def readable(self):
1271 return False
1272
1273 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1274
1275 def test_constructor_with_not_writeable(self):
1276 class NotWriteable(MockRawIO):
1277 def writable(self):
1278 return False
1279
1280 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1281
1282 def test_read(self):
1283 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1284
1285 self.assertEqual(pair.read(3), b"abc")
1286 self.assertEqual(pair.read(1), b"d")
1287 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001288 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1289 self.assertEqual(pair.read(None), b"abc")
1290
1291 def test_readlines(self):
1292 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1293 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1294 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1295 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001296
1297 def test_read1(self):
1298 # .read1() is delegated to the underlying reader object, so this test
1299 # can be shallow.
1300 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1301
1302 self.assertEqual(pair.read1(3), b"abc")
1303
1304 def test_readinto(self):
1305 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1306
1307 data = bytearray(5)
1308 self.assertEqual(pair.readinto(data), 5)
1309 self.assertEqual(data, b"abcde")
1310
1311 def test_write(self):
1312 w = self.MockRawIO()
1313 pair = self.tp(self.MockRawIO(), w)
1314
1315 pair.write(b"abc")
1316 pair.flush()
1317 pair.write(b"def")
1318 pair.flush()
1319 self.assertEqual(w._write_stack, [b"abc", b"def"])
1320
1321 def test_peek(self):
1322 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1323
1324 self.assertTrue(pair.peek(3).startswith(b"abc"))
1325 self.assertEqual(pair.read(3), b"abc")
1326
1327 def test_readable(self):
1328 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1329 self.assertTrue(pair.readable())
1330
1331 def test_writeable(self):
1332 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1333 self.assertTrue(pair.writable())
1334
1335 def test_seekable(self):
1336 # BufferedRWPairs are never seekable, even if their readers and writers
1337 # are.
1338 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1339 self.assertFalse(pair.seekable())
1340
1341 # .flush() is delegated to the underlying writer object and has been
1342 # tested in the test_write method.
1343
1344 def test_close_and_closed(self):
1345 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1346 self.assertFalse(pair.closed)
1347 pair.close()
1348 self.assertTrue(pair.closed)
1349
1350 def test_isatty(self):
1351 class SelectableIsAtty(MockRawIO):
1352 def __init__(self, isatty):
1353 MockRawIO.__init__(self)
1354 self._isatty = isatty
1355
1356 def isatty(self):
1357 return self._isatty
1358
1359 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1360 self.assertFalse(pair.isatty())
1361
1362 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1363 self.assertTrue(pair.isatty())
1364
1365 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1366 self.assertTrue(pair.isatty())
1367
1368 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1369 self.assertTrue(pair.isatty())
1370
1371class CBufferedRWPairTest(BufferedRWPairTest):
1372 tp = io.BufferedRWPair
1373
1374class PyBufferedRWPairTest(BufferedRWPairTest):
1375 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001376
1377
Antoine Pitrou19690592009-06-12 20:14:08 +00001378class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1379 read_mode = "rb+"
1380 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001381
Antoine Pitrou19690592009-06-12 20:14:08 +00001382 def test_constructor(self):
1383 BufferedReaderTest.test_constructor(self)
1384 BufferedWriterTest.test_constructor(self)
1385
1386 def test_read_and_write(self):
1387 raw = self.MockRawIO((b"asdf", b"ghjk"))
1388 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001389
1390 self.assertEqual(b"as", rw.read(2))
1391 rw.write(b"ddd")
1392 rw.write(b"eee")
1393 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001394 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001395 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001396
Antoine Pitrou19690592009-06-12 20:14:08 +00001397 def test_seek_and_tell(self):
1398 raw = self.BytesIO(b"asdfghjkl")
1399 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001400
Ezio Melotti2623a372010-11-21 13:34:58 +00001401 self.assertEqual(b"as", rw.read(2))
1402 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001403 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001404 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001405
Antoine Pitrou808cec52011-08-20 15:40:58 +02001406 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001407 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001408 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001409 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001410 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001411 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001412 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001413 self.assertEqual(7, rw.tell())
1414 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001415 rw.flush()
1416 self.assertEqual(b"asdf123fl", raw.getvalue())
1417
Christian Heimes1a6387e2008-03-26 12:49:49 +00001418 self.assertRaises(TypeError, rw.seek, 0.0)
1419
Antoine Pitrou19690592009-06-12 20:14:08 +00001420 def check_flush_and_read(self, read_func):
1421 raw = self.BytesIO(b"abcdefghi")
1422 bufio = self.tp(raw)
1423
Ezio Melotti2623a372010-11-21 13:34:58 +00001424 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001425 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001426 self.assertEqual(b"ef", read_func(bufio, 2))
1427 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001428 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001429 self.assertEqual(6, bufio.tell())
1430 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001431 raw.seek(0, 0)
1432 raw.write(b"XYZ")
1433 # flush() resets the read buffer
1434 bufio.flush()
1435 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001436 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001437
1438 def test_flush_and_read(self):
1439 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1440
1441 def test_flush_and_readinto(self):
1442 def _readinto(bufio, n=-1):
1443 b = bytearray(n if n >= 0 else 9999)
1444 n = bufio.readinto(b)
1445 return bytes(b[:n])
1446 self.check_flush_and_read(_readinto)
1447
1448 def test_flush_and_peek(self):
1449 def _peek(bufio, n=-1):
1450 # This relies on the fact that the buffer can contain the whole
1451 # raw stream, otherwise peek() can return less.
1452 b = bufio.peek(n)
1453 if n != -1:
1454 b = b[:n]
1455 bufio.seek(len(b), 1)
1456 return b
1457 self.check_flush_and_read(_peek)
1458
1459 def test_flush_and_write(self):
1460 raw = self.BytesIO(b"abcdefghi")
1461 bufio = self.tp(raw)
1462
1463 bufio.write(b"123")
1464 bufio.flush()
1465 bufio.write(b"45")
1466 bufio.flush()
1467 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001468 self.assertEqual(b"12345fghi", raw.getvalue())
1469 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001470
1471 def test_threads(self):
1472 BufferedReaderTest.test_threads(self)
1473 BufferedWriterTest.test_threads(self)
1474
1475 def test_writes_and_peek(self):
1476 def _peek(bufio):
1477 bufio.peek(1)
1478 self.check_writes(_peek)
1479 def _peek(bufio):
1480 pos = bufio.tell()
1481 bufio.seek(-1, 1)
1482 bufio.peek(1)
1483 bufio.seek(pos, 0)
1484 self.check_writes(_peek)
1485
1486 def test_writes_and_reads(self):
1487 def _read(bufio):
1488 bufio.seek(-1, 1)
1489 bufio.read(1)
1490 self.check_writes(_read)
1491
1492 def test_writes_and_read1s(self):
1493 def _read1(bufio):
1494 bufio.seek(-1, 1)
1495 bufio.read1(1)
1496 self.check_writes(_read1)
1497
1498 def test_writes_and_readintos(self):
1499 def _read(bufio):
1500 bufio.seek(-1, 1)
1501 bufio.readinto(bytearray(1))
1502 self.check_writes(_read)
1503
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001504 def test_write_after_readahead(self):
1505 # Issue #6629: writing after the buffer was filled by readahead should
1506 # first rewind the raw stream.
1507 for overwrite_size in [1, 5]:
1508 raw = self.BytesIO(b"A" * 10)
1509 bufio = self.tp(raw, 4)
1510 # Trigger readahead
1511 self.assertEqual(bufio.read(1), b"A")
1512 self.assertEqual(bufio.tell(), 1)
1513 # Overwriting should rewind the raw stream if it needs so
1514 bufio.write(b"B" * overwrite_size)
1515 self.assertEqual(bufio.tell(), overwrite_size + 1)
1516 # If the write size was smaller than the buffer size, flush() and
1517 # check that rewind happens.
1518 bufio.flush()
1519 self.assertEqual(bufio.tell(), overwrite_size + 1)
1520 s = raw.getvalue()
1521 self.assertEqual(s,
1522 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1523
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001524 def test_write_rewind_write(self):
1525 # Various combinations of reading / writing / seeking backwards / writing again
1526 def mutate(bufio, pos1, pos2):
1527 assert pos2 >= pos1
1528 # Fill the buffer
1529 bufio.seek(pos1)
1530 bufio.read(pos2 - pos1)
1531 bufio.write(b'\x02')
1532 # This writes earlier than the previous write, but still inside
1533 # the buffer.
1534 bufio.seek(pos1)
1535 bufio.write(b'\x01')
1536
1537 b = b"\x80\x81\x82\x83\x84"
1538 for i in range(0, len(b)):
1539 for j in range(i, len(b)):
1540 raw = self.BytesIO(b)
1541 bufio = self.tp(raw, 100)
1542 mutate(bufio, i, j)
1543 bufio.flush()
1544 expected = bytearray(b)
1545 expected[j] = 2
1546 expected[i] = 1
1547 self.assertEqual(raw.getvalue(), expected,
1548 "failed result for i=%d, j=%d" % (i, j))
1549
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001550 def test_truncate_after_read_or_write(self):
1551 raw = self.BytesIO(b"A" * 10)
1552 bufio = self.tp(raw, 100)
1553 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1554 self.assertEqual(bufio.truncate(), 2)
1555 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1556 self.assertEqual(bufio.truncate(), 4)
1557
Antoine Pitrou19690592009-06-12 20:14:08 +00001558 def test_misbehaved_io(self):
1559 BufferedReaderTest.test_misbehaved_io(self)
1560 BufferedWriterTest.test_misbehaved_io(self)
1561
Antoine Pitrou808cec52011-08-20 15:40:58 +02001562 def test_interleaved_read_write(self):
1563 # Test for issue #12213
1564 with self.BytesIO(b'abcdefgh') as raw:
1565 with self.tp(raw, 100) as f:
1566 f.write(b"1")
1567 self.assertEqual(f.read(1), b'b')
1568 f.write(b'2')
1569 self.assertEqual(f.read1(1), b'd')
1570 f.write(b'3')
1571 buf = bytearray(1)
1572 f.readinto(buf)
1573 self.assertEqual(buf, b'f')
1574 f.write(b'4')
1575 self.assertEqual(f.peek(1), b'h')
1576 f.flush()
1577 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1578
1579 with self.BytesIO(b'abc') as raw:
1580 with self.tp(raw, 100) as f:
1581 self.assertEqual(f.read(1), b'a')
1582 f.write(b"2")
1583 self.assertEqual(f.read(1), b'c')
1584 f.flush()
1585 self.assertEqual(raw.getvalue(), b'a2c')
1586
1587 def test_interleaved_readline_write(self):
1588 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1589 with self.tp(raw) as f:
1590 f.write(b'1')
1591 self.assertEqual(f.readline(), b'b\n')
1592 f.write(b'2')
1593 self.assertEqual(f.readline(), b'def\n')
1594 f.write(b'3')
1595 self.assertEqual(f.readline(), b'\n')
1596 f.flush()
1597 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1598
Antoine Pitroubff5df02012-07-29 19:02:46 +02001599class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1600 BufferedRandomTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001601 tp = io.BufferedRandom
1602
1603 def test_constructor(self):
1604 BufferedRandomTest.test_constructor(self)
1605 # The allocation can succeed on 32-bit builds, e.g. with more
1606 # than 2GB RAM and a 64-bit kernel.
1607 if sys.maxsize > 0x7FFFFFFF:
1608 rawio = self.MockRawIO()
1609 bufio = self.tp(rawio)
1610 self.assertRaises((OverflowError, MemoryError, ValueError),
1611 bufio.__init__, rawio, sys.maxsize)
1612
1613 def test_garbage_collection(self):
1614 CBufferedReaderTest.test_garbage_collection(self)
1615 CBufferedWriterTest.test_garbage_collection(self)
1616
1617class PyBufferedRandomTest(BufferedRandomTest):
1618 tp = pyio.BufferedRandom
1619
1620
Christian Heimes1a6387e2008-03-26 12:49:49 +00001621# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1622# properties:
1623# - A single output character can correspond to many bytes of input.
1624# - The number of input bytes to complete the character can be
1625# undetermined until the last input byte is received.
1626# - The number of input bytes can vary depending on previous input.
1627# - A single input byte can correspond to many characters of output.
1628# - The number of output characters can be undetermined until the
1629# last input byte is received.
1630# - The number of output characters can vary depending on previous input.
1631
1632class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1633 """
1634 For testing seek/tell behavior with a stateful, buffering decoder.
1635
1636 Input is a sequence of words. Words may be fixed-length (length set
1637 by input) or variable-length (period-terminated). In variable-length
1638 mode, extra periods are ignored. Possible words are:
1639 - 'i' followed by a number sets the input length, I (maximum 99).
1640 When I is set to 0, words are space-terminated.
1641 - 'o' followed by a number sets the output length, O (maximum 99).
1642 - Any other word is converted into a word followed by a period on
1643 the output. The output word consists of the input word truncated
1644 or padded out with hyphens to make its length equal to O. If O
1645 is 0, the word is output verbatim without truncating or padding.
1646 I and O are initially set to 1. When I changes, any buffered input is
1647 re-scanned according to the new I. EOF also terminates the last word.
1648 """
1649
1650 def __init__(self, errors='strict'):
1651 codecs.IncrementalDecoder.__init__(self, errors)
1652 self.reset()
1653
1654 def __repr__(self):
1655 return '<SID %x>' % id(self)
1656
1657 def reset(self):
1658 self.i = 1
1659 self.o = 1
1660 self.buffer = bytearray()
1661
1662 def getstate(self):
1663 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1664 return bytes(self.buffer), i*100 + o
1665
1666 def setstate(self, state):
1667 buffer, io = state
1668 self.buffer = bytearray(buffer)
1669 i, o = divmod(io, 100)
1670 self.i, self.o = i ^ 1, o ^ 1
1671
1672 def decode(self, input, final=False):
1673 output = ''
1674 for b in input:
1675 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001676 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001677 if self.buffer:
1678 output += self.process_word()
1679 else:
1680 self.buffer.append(b)
1681 else: # fixed-length, terminate after self.i bytes
1682 self.buffer.append(b)
1683 if len(self.buffer) == self.i:
1684 output += self.process_word()
1685 if final and self.buffer: # EOF terminates the last word
1686 output += self.process_word()
1687 return output
1688
1689 def process_word(self):
1690 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001691 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001692 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001693 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001694 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1695 else:
1696 output = self.buffer.decode('ascii')
1697 if len(output) < self.o:
1698 output += '-'*self.o # pad out with hyphens
1699 if self.o:
1700 output = output[:self.o] # truncate to output length
1701 output += '.'
1702 self.buffer = bytearray()
1703 return output
1704
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001705 codecEnabled = False
1706
1707 @classmethod
1708 def lookupTestDecoder(cls, name):
1709 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001710 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001711 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001712 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001713 incrementalencoder=None,
1714 streamreader=None, streamwriter=None,
1715 incrementaldecoder=cls)
1716
1717# Register the previous decoder for testing.
1718# Disabled by default, tests will enable it.
1719codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1720
1721
Christian Heimes1a6387e2008-03-26 12:49:49 +00001722class StatefulIncrementalDecoderTest(unittest.TestCase):
1723 """
1724 Make sure the StatefulIncrementalDecoder actually works.
1725 """
1726
1727 test_cases = [
1728 # I=1, O=1 (fixed-length input == fixed-length output)
1729 (b'abcd', False, 'a.b.c.d.'),
1730 # I=0, O=0 (variable-length input, variable-length output)
1731 (b'oiabcd', True, 'abcd.'),
1732 # I=0, O=0 (should ignore extra periods)
1733 (b'oi...abcd...', True, 'abcd.'),
1734 # I=0, O=6 (variable-length input, fixed-length output)
1735 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1736 # I=2, O=6 (fixed-length input < fixed-length output)
1737 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1738 # I=6, O=3 (fixed-length input > fixed-length output)
1739 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1740 # I=0, then 3; O=29, then 15 (with longer output)
1741 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1742 'a----------------------------.' +
1743 'b----------------------------.' +
1744 'cde--------------------------.' +
1745 'abcdefghijabcde.' +
1746 'a.b------------.' +
1747 '.c.------------.' +
1748 'd.e------------.' +
1749 'k--------------.' +
1750 'l--------------.' +
1751 'm--------------.')
1752 ]
1753
Antoine Pitrou19690592009-06-12 20:14:08 +00001754 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001755 # Try a few one-shot test cases.
1756 for input, eof, output in self.test_cases:
1757 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001758 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001759
1760 # Also test an unfinished decode, followed by forcing EOF.
1761 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001762 self.assertEqual(d.decode(b'oiabcd'), '')
1763 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001764
1765class TextIOWrapperTest(unittest.TestCase):
1766
1767 def setUp(self):
1768 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1769 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001770 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001771
1772 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001773 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001774
Antoine Pitrou19690592009-06-12 20:14:08 +00001775 def test_constructor(self):
1776 r = self.BytesIO(b"\xc3\xa9\n\n")
1777 b = self.BufferedReader(r, 1000)
1778 t = self.TextIOWrapper(b)
1779 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001780 self.assertEqual(t.encoding, "latin1")
1781 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001782 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001783 self.assertEqual(t.encoding, "utf8")
1784 self.assertEqual(t.line_buffering, True)
1785 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001786 self.assertRaises(TypeError, t.__init__, b, newline=42)
1787 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1788
1789 def test_detach(self):
1790 r = self.BytesIO()
1791 b = self.BufferedWriter(r)
1792 t = self.TextIOWrapper(b)
1793 self.assertIs(t.detach(), b)
1794
1795 t = self.TextIOWrapper(b, encoding="ascii")
1796 t.write("howdy")
1797 self.assertFalse(r.getvalue())
1798 t.detach()
1799 self.assertEqual(r.getvalue(), b"howdy")
1800 self.assertRaises(ValueError, t.detach)
1801
1802 def test_repr(self):
1803 raw = self.BytesIO("hello".encode("utf-8"))
1804 b = self.BufferedReader(raw)
1805 t = self.TextIOWrapper(b, encoding="utf-8")
1806 modname = self.TextIOWrapper.__module__
1807 self.assertEqual(repr(t),
1808 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1809 raw.name = "dummy"
1810 self.assertEqual(repr(t),
1811 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1812 raw.name = b"dummy"
1813 self.assertEqual(repr(t),
1814 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1815
1816 def test_line_buffering(self):
1817 r = self.BytesIO()
1818 b = self.BufferedWriter(r, 1000)
1819 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1820 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00001821 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001822 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00001823 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001824 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00001825 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001826
Antoine Pitrou19690592009-06-12 20:14:08 +00001827 def test_encoding(self):
1828 # Check the encoding attribute is always set, and valid
1829 b = self.BytesIO()
1830 t = self.TextIOWrapper(b, encoding="utf8")
1831 self.assertEqual(t.encoding, "utf8")
1832 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001833 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001834 codecs.lookup(t.encoding)
1835
1836 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001837 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001838 b = self.BytesIO(b"abc\n\xff\n")
1839 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001840 self.assertRaises(UnicodeError, t.read)
1841 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001842 b = self.BytesIO(b"abc\n\xff\n")
1843 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001844 self.assertRaises(UnicodeError, t.read)
1845 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001846 b = self.BytesIO(b"abc\n\xff\n")
1847 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00001848 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001849 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001850 b = self.BytesIO(b"abc\n\xff\n")
1851 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00001852 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001853
Antoine Pitrou19690592009-06-12 20:14:08 +00001854 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001855 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001856 b = self.BytesIO()
1857 t = self.TextIOWrapper(b, encoding="ascii")
1858 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001859 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001860 b = self.BytesIO()
1861 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1862 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001863 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001864 b = self.BytesIO()
1865 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001866 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001867 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001868 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001869 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001870 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001871 b = self.BytesIO()
1872 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001873 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001874 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001875 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001876 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001877
Antoine Pitrou19690592009-06-12 20:14:08 +00001878 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001879 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1880
1881 tests = [
1882 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1883 [ '', input_lines ],
1884 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1885 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1886 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1887 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001888 encodings = (
1889 'utf-8', 'latin-1',
1890 'utf-16', 'utf-16-le', 'utf-16-be',
1891 'utf-32', 'utf-32-le', 'utf-32-be',
1892 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001893
1894 # Try a range of buffer sizes to test the case where \r is the last
1895 # character in TextIOWrapper._pending_line.
1896 for encoding in encodings:
1897 # XXX: str.encode() should return bytes
1898 data = bytes(''.join(input_lines).encode(encoding))
1899 for do_reads in (False, True):
1900 for bufsize in range(1, 10):
1901 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001902 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1903 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001904 encoding=encoding)
1905 if do_reads:
1906 got_lines = []
1907 while True:
1908 c2 = textio.read(2)
1909 if c2 == '':
1910 break
Ezio Melotti2623a372010-11-21 13:34:58 +00001911 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001912 got_lines.append(c2 + textio.readline())
1913 else:
1914 got_lines = list(textio)
1915
1916 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00001917 self.assertEqual(got_line, exp_line)
1918 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001919
Antoine Pitrou19690592009-06-12 20:14:08 +00001920 def test_newlines_input(self):
1921 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001922 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1923 for newline, expected in [
1924 (None, normalized.decode("ascii").splitlines(True)),
1925 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001926 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1927 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1928 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001929 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001930 buf = self.BytesIO(testdata)
1931 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00001932 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001933 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001934 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001935
Antoine Pitrou19690592009-06-12 20:14:08 +00001936 def test_newlines_output(self):
1937 testdict = {
1938 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1939 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1940 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1941 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1942 }
1943 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1944 for newline, expected in tests:
1945 buf = self.BytesIO()
1946 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1947 txt.write("AAA\nB")
1948 txt.write("BB\nCCC\n")
1949 txt.write("X\rY\r\nZ")
1950 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001951 self.assertEqual(buf.closed, False)
1952 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00001953
1954 def test_destructor(self):
1955 l = []
1956 base = self.BytesIO
1957 class MyBytesIO(base):
1958 def close(self):
1959 l.append(self.getvalue())
1960 base.close(self)
1961 b = MyBytesIO()
1962 t = self.TextIOWrapper(b, encoding="ascii")
1963 t.write("abc")
1964 del t
1965 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001966 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00001967
1968 def test_override_destructor(self):
1969 record = []
1970 class MyTextIO(self.TextIOWrapper):
1971 def __del__(self):
1972 record.append(1)
1973 try:
1974 f = super(MyTextIO, self).__del__
1975 except AttributeError:
1976 pass
1977 else:
1978 f()
1979 def close(self):
1980 record.append(2)
1981 super(MyTextIO, self).close()
1982 def flush(self):
1983 record.append(3)
1984 super(MyTextIO, self).flush()
1985 b = self.BytesIO()
1986 t = MyTextIO(b, encoding="ascii")
1987 del t
1988 support.gc_collect()
1989 self.assertEqual(record, [1, 2, 3])
1990
1991 def test_error_through_destructor(self):
1992 # Test that the exception state is not modified by a destructor,
1993 # even if close() fails.
1994 rawio = self.CloseFailureIO()
1995 def f():
1996 self.TextIOWrapper(rawio).xyzzy
1997 with support.captured_output("stderr") as s:
1998 self.assertRaises(AttributeError, f)
1999 s = s.getvalue().strip()
2000 if s:
2001 # The destructor *may* have printed an unraisable error, check it
2002 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002003 self.assertTrue(s.startswith("Exception IOError: "), s)
2004 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002005
2006 # Systematic tests of the text I/O API
2007
Antoine Pitrou19690592009-06-12 20:14:08 +00002008 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002009 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2010 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00002011 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002012 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002013 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002014 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002015 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002016 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002017 self.assertEqual(f.tell(), 0)
2018 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002019 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002020 self.assertEqual(f.seek(0), 0)
2021 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002022 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002023 self.assertEqual(f.read(2), "ab")
2024 self.assertEqual(f.read(1), "c")
2025 self.assertEqual(f.read(1), "")
2026 self.assertEqual(f.read(), "")
2027 self.assertEqual(f.tell(), cookie)
2028 self.assertEqual(f.seek(0), 0)
2029 self.assertEqual(f.seek(0, 2), cookie)
2030 self.assertEqual(f.write("def"), 3)
2031 self.assertEqual(f.seek(cookie), cookie)
2032 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002033 if enc.startswith("utf"):
2034 self.multi_line_test(f, enc)
2035 f.close()
2036
2037 def multi_line_test(self, f, enc):
2038 f.seek(0)
2039 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002040 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002041 wlines = []
2042 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2043 chars = []
2044 for i in range(size):
2045 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002046 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002047 wlines.append((f.tell(), line))
2048 f.write(line)
2049 f.seek(0)
2050 rlines = []
2051 while True:
2052 pos = f.tell()
2053 line = f.readline()
2054 if not line:
2055 break
2056 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002057 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002058
Antoine Pitrou19690592009-06-12 20:14:08 +00002059 def test_telling(self):
2060 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002061 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002062 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002063 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002064 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002065 p2 = f.tell()
2066 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002067 self.assertEqual(f.tell(), p0)
2068 self.assertEqual(f.readline(), "\xff\n")
2069 self.assertEqual(f.tell(), p1)
2070 self.assertEqual(f.readline(), "\xff\n")
2071 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002072 f.seek(0)
2073 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002074 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002075 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002076 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002077 f.close()
2078
Antoine Pitrou19690592009-06-12 20:14:08 +00002079 def test_seeking(self):
2080 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002081 prefix_size = chunk_size - 2
2082 u_prefix = "a" * prefix_size
2083 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002084 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002085 u_suffix = "\u8888\n"
2086 suffix = bytes(u_suffix.encode("utf-8"))
2087 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002088 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002089 f.write(line*2)
2090 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002091 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002092 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002093 self.assertEqual(s, prefix.decode("ascii"))
2094 self.assertEqual(f.tell(), prefix_size)
2095 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002096
Antoine Pitrou19690592009-06-12 20:14:08 +00002097 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002098 # Regression test for a specific bug
2099 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002100 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002101 f.write(data)
2102 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002103 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002104 f._CHUNK_SIZE # Just test that it exists
2105 f._CHUNK_SIZE = 2
2106 f.readline()
2107 f.tell()
2108
Antoine Pitrou19690592009-06-12 20:14:08 +00002109 def test_seek_and_tell(self):
2110 #Test seek/tell using the StatefulIncrementalDecoder.
2111 # Make test faster by doing smaller seeks
2112 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002113
Antoine Pitrou19690592009-06-12 20:14:08 +00002114 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002115 """Tell/seek to various points within a data stream and ensure
2116 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002117 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002118 f.write(data)
2119 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002120 f = self.open(support.TESTFN, encoding='test_decoder')
2121 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002122 decoded = f.read()
2123 f.close()
2124
2125 for i in range(min_pos, len(decoded) + 1): # seek positions
2126 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002127 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002128 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002129 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002130 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002131 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002132 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002133 f.close()
2134
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002135 # Enable the test decoder.
2136 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002137
2138 # Run the tests.
2139 try:
2140 # Try each test case.
2141 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002142 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002143
2144 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002145 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2146 offset = CHUNK_SIZE - len(input)//2
2147 prefix = b'.'*offset
2148 # Don't bother seeking into the prefix (takes too long).
2149 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002150 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002151
2152 # Ensure our test decoder won't interfere with subsequent tests.
2153 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002154 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002155
Antoine Pitrou19690592009-06-12 20:14:08 +00002156 def test_encoded_writes(self):
2157 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002158 tests = ("utf-16",
2159 "utf-16-le",
2160 "utf-16-be",
2161 "utf-32",
2162 "utf-32-le",
2163 "utf-32-be")
2164 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002165 buf = self.BytesIO()
2166 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002167 # Check if the BOM is written only once (see issue1753).
2168 f.write(data)
2169 f.write(data)
2170 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002171 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002172 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002173 self.assertEqual(f.read(), data * 2)
2174 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002175
Antoine Pitrou19690592009-06-12 20:14:08 +00002176 def test_unreadable(self):
2177 class UnReadable(self.BytesIO):
2178 def readable(self):
2179 return False
2180 txt = self.TextIOWrapper(UnReadable())
2181 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002182
Antoine Pitrou19690592009-06-12 20:14:08 +00002183 def test_read_one_by_one(self):
2184 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002185 reads = ""
2186 while True:
2187 c = txt.read(1)
2188 if not c:
2189 break
2190 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002191 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002192
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002193 def test_readlines(self):
2194 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2195 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2196 txt.seek(0)
2197 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2198 txt.seek(0)
2199 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2200
Christian Heimes1a6387e2008-03-26 12:49:49 +00002201 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002202 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002203 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002204 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002205 reads = ""
2206 while True:
2207 c = txt.read(128)
2208 if not c:
2209 break
2210 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002211 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002212
2213 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002214 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002215
2216 # read one char at a time
2217 reads = ""
2218 while True:
2219 c = txt.read(1)
2220 if not c:
2221 break
2222 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002223 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002224
2225 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002226 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002227 txt._CHUNK_SIZE = 4
2228
2229 reads = ""
2230 while True:
2231 c = txt.read(4)
2232 if not c:
2233 break
2234 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002235 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002236
2237 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002238 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002239 txt._CHUNK_SIZE = 4
2240
2241 reads = txt.read(4)
2242 reads += txt.read(4)
2243 reads += txt.readline()
2244 reads += txt.readline()
2245 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002246 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002247
2248 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002249 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002250 txt._CHUNK_SIZE = 4
2251
2252 reads = txt.read(4)
2253 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002254 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002255
2256 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002257 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002258 txt._CHUNK_SIZE = 4
2259
2260 reads = txt.read(4)
2261 pos = txt.tell()
2262 txt.seek(0)
2263 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002264 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002265
2266 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002267 buffer = self.BytesIO(self.testdata)
2268 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002269
2270 self.assertEqual(buffer.seekable(), txt.seekable())
2271
Antoine Pitrou19690592009-06-12 20:14:08 +00002272 def test_append_bom(self):
2273 # The BOM is not written again when appending to a non-empty file
2274 filename = support.TESTFN
2275 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2276 with self.open(filename, 'w', encoding=charset) as f:
2277 f.write('aaa')
2278 pos = f.tell()
2279 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002280 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002281
2282 with self.open(filename, 'a', encoding=charset) as f:
2283 f.write('xxx')
2284 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002285 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002286
Antoine Pitrou19690592009-06-12 20:14:08 +00002287 def test_seek_bom(self):
2288 # Same test, but when seeking manually
2289 filename = support.TESTFN
2290 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2291 with self.open(filename, 'w', encoding=charset) as f:
2292 f.write('aaa')
2293 pos = f.tell()
2294 with self.open(filename, 'r+', encoding=charset) as f:
2295 f.seek(pos)
2296 f.write('zzz')
2297 f.seek(0)
2298 f.write('bbb')
2299 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002300 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002301
2302 def test_errors_property(self):
2303 with self.open(support.TESTFN, "w") as f:
2304 self.assertEqual(f.errors, "strict")
2305 with self.open(support.TESTFN, "w", errors="replace") as f:
2306 self.assertEqual(f.errors, "replace")
2307
Victor Stinner6a102812010-04-27 23:55:59 +00002308 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002309 def test_threads_write(self):
2310 # Issue6750: concurrent writes could duplicate data
2311 event = threading.Event()
2312 with self.open(support.TESTFN, "w", buffering=1) as f:
2313 def run(n):
2314 text = "Thread%03d\n" % n
2315 event.wait()
2316 f.write(text)
2317 threads = [threading.Thread(target=lambda n=x: run(n))
2318 for x in range(20)]
2319 for t in threads:
2320 t.start()
2321 time.sleep(0.02)
2322 event.set()
2323 for t in threads:
2324 t.join()
2325 with self.open(support.TESTFN) as f:
2326 content = f.read()
2327 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002328 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002329
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002330 def test_flush_error_on_close(self):
2331 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2332 def bad_flush():
2333 raise IOError()
2334 txt.flush = bad_flush
2335 self.assertRaises(IOError, txt.close) # exception not swallowed
2336
2337 def test_multi_close(self):
2338 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2339 txt.close()
2340 txt.close()
2341 txt.close()
2342 self.assertRaises(ValueError, txt.flush)
2343
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002344 def test_readonly_attributes(self):
2345 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2346 buf = self.BytesIO(self.testdata)
2347 with self.assertRaises((AttributeError, TypeError)):
2348 txt.buffer = buf
2349
Antoine Pitrou19690592009-06-12 20:14:08 +00002350class CTextIOWrapperTest(TextIOWrapperTest):
2351
2352 def test_initialization(self):
2353 r = self.BytesIO(b"\xc3\xa9\n\n")
2354 b = self.BufferedReader(r, 1000)
2355 t = self.TextIOWrapper(b)
2356 self.assertRaises(TypeError, t.__init__, b, newline=42)
2357 self.assertRaises(ValueError, t.read)
2358 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2359 self.assertRaises(ValueError, t.read)
2360
2361 def test_garbage_collection(self):
2362 # C TextIOWrapper objects are collected, and collecting them flushes
2363 # all data to disk.
2364 # The Python version has __del__, so it ends in gc.garbage instead.
2365 rawio = io.FileIO(support.TESTFN, "wb")
2366 b = self.BufferedWriter(rawio)
2367 t = self.TextIOWrapper(b, encoding="ascii")
2368 t.write("456def")
2369 t.x = t
2370 wr = weakref.ref(t)
2371 del t
2372 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002373 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002374 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002375 self.assertEqual(f.read(), b"456def")
2376
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002377 def test_rwpair_cleared_before_textio(self):
2378 # Issue 13070: TextIOWrapper's finalization would crash when called
2379 # after the reference to the underlying BufferedRWPair's writer got
2380 # cleared by the GC.
2381 for i in range(1000):
2382 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2383 t1 = self.TextIOWrapper(b1, encoding="ascii")
2384 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2385 t2 = self.TextIOWrapper(b2, encoding="ascii")
2386 # circular references
2387 t1.buddy = t2
2388 t2.buddy = t1
2389 support.gc_collect()
2390
2391
Antoine Pitrou19690592009-06-12 20:14:08 +00002392class PyTextIOWrapperTest(TextIOWrapperTest):
2393 pass
2394
2395
2396class IncrementalNewlineDecoderTest(unittest.TestCase):
2397
2398 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002399 # UTF-8 specific tests for a newline decoder
2400 def _check_decode(b, s, **kwargs):
2401 # We exercise getstate() / setstate() as well as decode()
2402 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002403 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002404 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002405 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002406
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002407 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002408
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002409 _check_decode(b'\xe8', "")
2410 _check_decode(b'\xa2', "")
2411 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002412
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002413 _check_decode(b'\xe8', "")
2414 _check_decode(b'\xa2', "")
2415 _check_decode(b'\x88', "\u8888")
2416
2417 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002418 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2419
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002420 decoder.reset()
2421 _check_decode(b'\n', "\n")
2422 _check_decode(b'\r', "")
2423 _check_decode(b'', "\n", final=True)
2424 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002425
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002426 _check_decode(b'\r', "")
2427 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002428
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002429 _check_decode(b'\r\r\n', "\n\n")
2430 _check_decode(b'\r', "")
2431 _check_decode(b'\r', "\n")
2432 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002433
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002434 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2435 _check_decode(b'\xe8\xa2\x88', "\u8888")
2436 _check_decode(b'\n', "\n")
2437 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2438 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002439
Antoine Pitrou19690592009-06-12 20:14:08 +00002440 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002441 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002442 if encoding is not None:
2443 encoder = codecs.getincrementalencoder(encoding)()
2444 def _decode_bytewise(s):
2445 # Decode one byte at a time
2446 for b in encoder.encode(s):
2447 result.append(decoder.decode(b))
2448 else:
2449 encoder = None
2450 def _decode_bytewise(s):
2451 # Decode one char at a time
2452 for c in s:
2453 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002454 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002455 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002456 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002457 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002458 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002459 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002460 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002461 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002462 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002463 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002464 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002465 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002466 input = "abc"
2467 if encoder is not None:
2468 encoder.reset()
2469 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002470 self.assertEqual(decoder.decode(input), "abc")
2471 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002472
2473 def test_newline_decoder(self):
2474 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002475 # None meaning the IncrementalNewlineDecoder takes unicode input
2476 # rather than bytes input
2477 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002478 'utf-16', 'utf-16-le', 'utf-16-be',
2479 'utf-32', 'utf-32-le', 'utf-32-be',
2480 )
2481 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002482 decoder = enc and codecs.getincrementaldecoder(enc)()
2483 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2484 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002485 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002486 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2487 self.check_newline_decoding_utf8(decoder)
2488
2489 def test_newline_bytes(self):
2490 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2491 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002492 self.assertEqual(dec.newlines, None)
2493 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2494 self.assertEqual(dec.newlines, None)
2495 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2496 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002497 dec = self.IncrementalNewlineDecoder(None, translate=False)
2498 _check(dec)
2499 dec = self.IncrementalNewlineDecoder(None, translate=True)
2500 _check(dec)
2501
2502class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2503 pass
2504
2505class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2506 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002507
Christian Heimes1a6387e2008-03-26 12:49:49 +00002508
2509# XXX Tests for open()
2510
2511class MiscIOTest(unittest.TestCase):
2512
Benjamin Petersonad100c32008-11-20 22:06:22 +00002513 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002514 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002515
Antoine Pitrou19690592009-06-12 20:14:08 +00002516 def test___all__(self):
2517 for name in self.io.__all__:
2518 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002519 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002520 if name == "open":
2521 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002522 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002523 self.assertTrue(issubclass(obj, Exception), name)
2524 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002525 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002526
Benjamin Petersonad100c32008-11-20 22:06:22 +00002527 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002528 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002529 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002530 f.close()
2531
Antoine Pitrou19690592009-06-12 20:14:08 +00002532 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002533 self.assertEqual(f.name, support.TESTFN)
2534 self.assertEqual(f.buffer.name, support.TESTFN)
2535 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2536 self.assertEqual(f.mode, "U")
2537 self.assertEqual(f.buffer.mode, "rb")
2538 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002539 f.close()
2540
Antoine Pitrou19690592009-06-12 20:14:08 +00002541 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002542 self.assertEqual(f.mode, "w+")
2543 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2544 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002545
Antoine Pitrou19690592009-06-12 20:14:08 +00002546 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002547 self.assertEqual(g.mode, "wb")
2548 self.assertEqual(g.raw.mode, "wb")
2549 self.assertEqual(g.name, f.fileno())
2550 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002551 f.close()
2552 g.close()
2553
Antoine Pitrou19690592009-06-12 20:14:08 +00002554 def test_io_after_close(self):
2555 for kwargs in [
2556 {"mode": "w"},
2557 {"mode": "wb"},
2558 {"mode": "w", "buffering": 1},
2559 {"mode": "w", "buffering": 2},
2560 {"mode": "wb", "buffering": 0},
2561 {"mode": "r"},
2562 {"mode": "rb"},
2563 {"mode": "r", "buffering": 1},
2564 {"mode": "r", "buffering": 2},
2565 {"mode": "rb", "buffering": 0},
2566 {"mode": "w+"},
2567 {"mode": "w+b"},
2568 {"mode": "w+", "buffering": 1},
2569 {"mode": "w+", "buffering": 2},
2570 {"mode": "w+b", "buffering": 0},
2571 ]:
2572 f = self.open(support.TESTFN, **kwargs)
2573 f.close()
2574 self.assertRaises(ValueError, f.flush)
2575 self.assertRaises(ValueError, f.fileno)
2576 self.assertRaises(ValueError, f.isatty)
2577 self.assertRaises(ValueError, f.__iter__)
2578 if hasattr(f, "peek"):
2579 self.assertRaises(ValueError, f.peek, 1)
2580 self.assertRaises(ValueError, f.read)
2581 if hasattr(f, "read1"):
2582 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002583 if hasattr(f, "readall"):
2584 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002585 if hasattr(f, "readinto"):
2586 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2587 self.assertRaises(ValueError, f.readline)
2588 self.assertRaises(ValueError, f.readlines)
2589 self.assertRaises(ValueError, f.seek, 0)
2590 self.assertRaises(ValueError, f.tell)
2591 self.assertRaises(ValueError, f.truncate)
2592 self.assertRaises(ValueError, f.write,
2593 b"" if "b" in kwargs['mode'] else "")
2594 self.assertRaises(ValueError, f.writelines, [])
2595 self.assertRaises(ValueError, next, f)
2596
2597 def test_blockingioerror(self):
2598 # Various BlockingIOError issues
2599 self.assertRaises(TypeError, self.BlockingIOError)
2600 self.assertRaises(TypeError, self.BlockingIOError, 1)
2601 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2602 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2603 b = self.BlockingIOError(1, "")
2604 self.assertEqual(b.characters_written, 0)
2605 class C(unicode):
2606 pass
2607 c = C("")
2608 b = self.BlockingIOError(1, c)
2609 c.b = b
2610 b.c = c
2611 wr = weakref.ref(c)
2612 del c, b
2613 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002614 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002615
2616 def test_abcs(self):
2617 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002618 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2619 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2620 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2621 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002622
2623 def _check_abc_inheritance(self, abcmodule):
2624 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002625 self.assertIsInstance(f, abcmodule.IOBase)
2626 self.assertIsInstance(f, abcmodule.RawIOBase)
2627 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2628 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002629 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002630 self.assertIsInstance(f, abcmodule.IOBase)
2631 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2632 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2633 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002634 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002635 self.assertIsInstance(f, abcmodule.IOBase)
2636 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2637 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2638 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002639
2640 def test_abc_inheritance(self):
2641 # Test implementations inherit from their respective ABCs
2642 self._check_abc_inheritance(self)
2643
2644 def test_abc_inheritance_official(self):
2645 # Test implementations inherit from the official ABCs of the
2646 # baseline "io" module.
2647 self._check_abc_inheritance(io)
2648
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01002649 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2650 def test_nonblock_pipe_write_bigbuf(self):
2651 self._test_nonblock_pipe_write(16*1024)
2652
2653 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2654 def test_nonblock_pipe_write_smallbuf(self):
2655 self._test_nonblock_pipe_write(1024)
2656
2657 def _set_non_blocking(self, fd):
2658 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2659 self.assertNotEqual(flags, -1)
2660 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2661 self.assertEqual(res, 0)
2662
2663 def _test_nonblock_pipe_write(self, bufsize):
2664 sent = []
2665 received = []
2666 r, w = os.pipe()
2667 self._set_non_blocking(r)
2668 self._set_non_blocking(w)
2669
2670 # To exercise all code paths in the C implementation we need
2671 # to play with buffer sizes. For instance, if we choose a
2672 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2673 # then we will never get a partial write of the buffer.
2674 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2675 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2676
2677 with rf, wf:
2678 for N in 9999, 73, 7574:
2679 try:
2680 i = 0
2681 while True:
2682 msg = bytes([i % 26 + 97]) * N
2683 sent.append(msg)
2684 wf.write(msg)
2685 i += 1
2686
2687 except self.BlockingIOError as e:
2688 self.assertEqual(e.args[0], errno.EAGAIN)
2689 sent[-1] = sent[-1][:e.characters_written]
2690 received.append(rf.read())
2691 msg = b'BLOCKED'
2692 wf.write(msg)
2693 sent.append(msg)
2694
2695 while True:
2696 try:
2697 wf.flush()
2698 break
2699 except self.BlockingIOError as e:
2700 self.assertEqual(e.args[0], errno.EAGAIN)
2701 self.assertEqual(e.characters_written, 0)
2702 received.append(rf.read())
2703
2704 received += iter(rf.read, None)
2705
2706 sent, received = b''.join(sent), b''.join(received)
2707 self.assertTrue(sent == received)
2708 self.assertTrue(wf.closed)
2709 self.assertTrue(rf.closed)
2710
Antoine Pitrou19690592009-06-12 20:14:08 +00002711class CMiscIOTest(MiscIOTest):
2712 io = io
2713
2714class PyMiscIOTest(MiscIOTest):
2715 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002716
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002717
2718@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2719class SignalsTest(unittest.TestCase):
2720
2721 def setUp(self):
2722 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2723
2724 def tearDown(self):
2725 signal.signal(signal.SIGALRM, self.oldalrm)
2726
2727 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00002728 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002729
2730 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02002731 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2732 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002733 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2734 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00002735 invokes the signal handler, and bubbles up the exception raised
2736 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002737 read_results = []
2738 def _read():
2739 s = os.read(r, 1)
2740 read_results.append(s)
2741 t = threading.Thread(target=_read)
2742 t.daemon = True
2743 r, w = os.pipe()
2744 try:
2745 wio = self.io.open(w, **fdopen_kwargs)
2746 t.start()
2747 signal.alarm(1)
2748 # Fill the pipe enough that the write will be blocking.
2749 # It will be interrupted by the timer armed above. Since the
2750 # other thread has read one byte, the low-level write will
2751 # return with a successful (partial) result rather than an EINTR.
2752 # The buffered IO layer must check for pending signal
2753 # handlers, which in this case will invoke alarm_interrupt().
2754 self.assertRaises(ZeroDivisionError,
2755 wio.write, item * (1024 * 1024))
2756 t.join()
2757 # We got one byte, get another one and check that it isn't a
2758 # repeat of the first one.
2759 read_results.append(os.read(r, 1))
2760 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2761 finally:
2762 os.close(w)
2763 os.close(r)
2764 # This is deliberate. If we didn't close the file descriptor
2765 # before closing wio, wio would try to flush its internal
2766 # buffer, and block again.
2767 try:
2768 wio.close()
2769 except IOError as e:
2770 if e.errno != errno.EBADF:
2771 raise
2772
2773 def test_interrupted_write_unbuffered(self):
2774 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2775
2776 def test_interrupted_write_buffered(self):
2777 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2778
2779 def test_interrupted_write_text(self):
2780 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2781
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002782 def check_reentrant_write(self, data, **fdopen_kwargs):
2783 def on_alarm(*args):
2784 # Will be called reentrantly from the same thread
2785 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02002786 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002787 signal.signal(signal.SIGALRM, on_alarm)
2788 r, w = os.pipe()
2789 wio = self.io.open(w, **fdopen_kwargs)
2790 try:
2791 signal.alarm(1)
2792 # Either the reentrant call to wio.write() fails with RuntimeError,
2793 # or the signal handler raises ZeroDivisionError.
2794 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2795 while 1:
2796 for i in range(100):
2797 wio.write(data)
2798 wio.flush()
2799 # Make sure the buffer doesn't fill up and block further writes
2800 os.read(r, len(data) * 100)
2801 exc = cm.exception
2802 if isinstance(exc, RuntimeError):
2803 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2804 finally:
2805 wio.close()
2806 os.close(r)
2807
2808 def test_reentrant_write_buffered(self):
2809 self.check_reentrant_write(b"xy", mode="wb")
2810
2811 def test_reentrant_write_text(self):
2812 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2813
Antoine Pitrou6439c002011-02-25 21:35:47 +00002814 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2815 """Check that a buffered read, when it gets interrupted (either
2816 returning a partial result or EINTR), properly invokes the signal
2817 handler and retries if the latter returned successfully."""
2818 r, w = os.pipe()
2819 fdopen_kwargs["closefd"] = False
2820 def alarm_handler(sig, frame):
2821 os.write(w, b"bar")
2822 signal.signal(signal.SIGALRM, alarm_handler)
2823 try:
2824 rio = self.io.open(r, **fdopen_kwargs)
2825 os.write(w, b"foo")
2826 signal.alarm(1)
2827 # Expected behaviour:
2828 # - first raw read() returns partial b"foo"
2829 # - second raw read() returns EINTR
2830 # - third raw read() returns b"bar"
2831 self.assertEqual(decode(rio.read(6)), "foobar")
2832 finally:
2833 rio.close()
2834 os.close(w)
2835 os.close(r)
2836
2837 def test_interrupterd_read_retry_buffered(self):
2838 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2839 mode="rb")
2840
2841 def test_interrupterd_read_retry_text(self):
2842 self.check_interrupted_read_retry(lambda x: x,
2843 mode="r")
2844
2845 @unittest.skipUnless(threading, 'Threading required for this test.')
2846 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2847 """Check that a buffered write, when it gets interrupted (either
2848 returning a partial result or EINTR), properly invokes the signal
2849 handler and retries if the latter returned successfully."""
2850 select = support.import_module("select")
2851 # A quantity that exceeds the buffer size of an anonymous pipe's
2852 # write end.
2853 N = 1024 * 1024
2854 r, w = os.pipe()
2855 fdopen_kwargs["closefd"] = False
2856 # We need a separate thread to read from the pipe and allow the
2857 # write() to finish. This thread is started after the SIGALRM is
2858 # received (forcing a first EINTR in write()).
2859 read_results = []
2860 write_finished = False
2861 def _read():
2862 while not write_finished:
2863 while r in select.select([r], [], [], 1.0)[0]:
2864 s = os.read(r, 1024)
2865 read_results.append(s)
2866 t = threading.Thread(target=_read)
2867 t.daemon = True
2868 def alarm1(sig, frame):
2869 signal.signal(signal.SIGALRM, alarm2)
2870 signal.alarm(1)
2871 def alarm2(sig, frame):
2872 t.start()
2873 signal.signal(signal.SIGALRM, alarm1)
2874 try:
2875 wio = self.io.open(w, **fdopen_kwargs)
2876 signal.alarm(1)
2877 # Expected behaviour:
2878 # - first raw write() is partial (because of the limited pipe buffer
2879 # and the first alarm)
2880 # - second raw write() returns EINTR (because of the second alarm)
2881 # - subsequent write()s are successful (either partial or complete)
2882 self.assertEqual(N, wio.write(item * N))
2883 wio.flush()
2884 write_finished = True
2885 t.join()
2886 self.assertEqual(N, sum(len(x) for x in read_results))
2887 finally:
2888 write_finished = True
2889 os.close(w)
2890 os.close(r)
2891 # This is deliberate. If we didn't close the file descriptor
2892 # before closing wio, wio would try to flush its internal
2893 # buffer, and could block (in case of failure).
2894 try:
2895 wio.close()
2896 except IOError as e:
2897 if e.errno != errno.EBADF:
2898 raise
2899
2900 def test_interrupterd_write_retry_buffered(self):
2901 self.check_interrupted_write_retry(b"x", mode="wb")
2902
2903 def test_interrupterd_write_retry_text(self):
2904 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
2905
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002906
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002907class CSignalsTest(SignalsTest):
2908 io = io
2909
2910class PySignalsTest(SignalsTest):
2911 io = pyio
2912
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002913 # Handling reentrancy issues would slow down _pyio even more, so the
2914 # tests are disabled.
2915 test_reentrant_write_buffered = None
2916 test_reentrant_write_text = None
2917
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002918
Christian Heimes1a6387e2008-03-26 12:49:49 +00002919def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00002920 tests = (CIOTest, PyIOTest,
2921 CBufferedReaderTest, PyBufferedReaderTest,
2922 CBufferedWriterTest, PyBufferedWriterTest,
2923 CBufferedRWPairTest, PyBufferedRWPairTest,
2924 CBufferedRandomTest, PyBufferedRandomTest,
2925 StatefulIncrementalDecoderTest,
2926 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2927 CTextIOWrapperTest, PyTextIOWrapperTest,
2928 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002929 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00002930 )
2931
2932 # Put the namespaces of the IO module we are testing and some useful mock
2933 # classes in the __dict__ of each test.
2934 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00002935 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00002936 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2937 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2938 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2939 globs = globals()
2940 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2941 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2942 # Avoid turning open into a bound method.
2943 py_io_ns["open"] = pyio.OpenWrapper
2944 for test in tests:
2945 if test.__name__.startswith("C"):
2946 for name, obj in c_io_ns.items():
2947 setattr(test, name, obj)
2948 elif test.__name__.startswith("Py"):
2949 for name, obj in py_io_ns.items():
2950 setattr(test, name, obj)
2951
2952 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002953
2954if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00002955 test_main()