blob: bc59af399d130f8ca37b596b1228a27b88f50533 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Antoine Pitrou19690592009-06-12 20:14:08 +000032import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000033import signal
34import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000035from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000036from collections import deque
Antoine Pitrou78e761e2012-10-16 22:57:11 +020037from UserList import UserList
Antoine Pitrou19690592009-06-12 20:14:08 +000038from test import test_support as support
Serhiy Storchaka354d50e2013-02-03 17:10:42 +020039import contextlib
Christian Heimes1a6387e2008-03-26 12:49:49 +000040
41import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000042import io # C implementation of io
43import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000044try:
45 import threading
46except ImportError:
47 threading = None
Antoine Pitrou5aa7df32011-11-21 20:16:44 +010048try:
49 import fcntl
50except ImportError:
51 fcntl = None
Antoine Pitrou19690592009-06-12 20:14:08 +000052
53__metaclass__ = type
54bytes = support.py3k_bytes
55
56def _default_chunk_size():
57 """Get the default TextIOWrapper chunk size"""
58 with io.open(__file__, "r", encoding="latin1") as f:
59 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000060
61
Antoine Pitrou6391b342010-09-14 18:48:19 +000062class MockRawIOWithoutRead:
63 """A RawIO implementation without read(), so as to exercise the default
64 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000065
66 def __init__(self, read_stack=()):
67 self._read_stack = list(read_stack)
68 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000069 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000070 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000071
Christian Heimes1a6387e2008-03-26 12:49:49 +000072 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000073 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000074 return len(b)
75
76 def writable(self):
77 return True
78
79 def fileno(self):
80 return 42
81
82 def readable(self):
83 return True
84
85 def seekable(self):
86 return True
87
88 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000089 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000090
91 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000092 return 0 # same comment as above
93
94 def readinto(self, buf):
95 self._reads += 1
96 max_len = len(buf)
97 try:
98 data = self._read_stack[0]
99 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000100 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +0000101 return 0
102 if data is None:
103 del self._read_stack[0]
104 return None
105 n = len(data)
106 if len(data) <= max_len:
107 del self._read_stack[0]
108 buf[:n] = data
109 return n
110 else:
111 buf[:] = data[:max_len]
112 self._read_stack[0] = data[max_len:]
113 return max_len
114
115 def truncate(self, pos=None):
116 return pos
117
Antoine Pitrou6391b342010-09-14 18:48:19 +0000118class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
119 pass
120
121class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
122 pass
123
124
125class MockRawIO(MockRawIOWithoutRead):
126
127 def read(self, n=None):
128 self._reads += 1
129 try:
130 return self._read_stack.pop(0)
131 except:
132 self._extraneous_reads += 1
133 return b""
134
Antoine Pitrou19690592009-06-12 20:14:08 +0000135class CMockRawIO(MockRawIO, io.RawIOBase):
136 pass
137
138class PyMockRawIO(MockRawIO, pyio.RawIOBase):
139 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000140
141
Antoine Pitrou19690592009-06-12 20:14:08 +0000142class MisbehavedRawIO(MockRawIO):
143 def write(self, b):
144 return MockRawIO.write(self, b) * 2
145
146 def read(self, n=None):
147 return MockRawIO.read(self, n) * 2
148
149 def seek(self, pos, whence):
150 return -123
151
152 def tell(self):
153 return -456
154
155 def readinto(self, buf):
156 MockRawIO.readinto(self, buf)
157 return len(buf) * 5
158
159class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
160 pass
161
162class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
163 pass
164
165
166class CloseFailureIO(MockRawIO):
167 closed = 0
168
169 def close(self):
170 if not self.closed:
171 self.closed = 1
172 raise IOError
173
174class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
175 pass
176
177class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
178 pass
179
180
181class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000182
183 def __init__(self, data):
184 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000185 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000186
187 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000188 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000189 self.read_history.append(None if res is None else len(res))
190 return res
191
Antoine Pitrou19690592009-06-12 20:14:08 +0000192 def readinto(self, b):
193 res = super(MockFileIO, self).readinto(b)
194 self.read_history.append(res)
195 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000196
Antoine Pitrou19690592009-06-12 20:14:08 +0000197class CMockFileIO(MockFileIO, io.BytesIO):
198 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000199
Antoine Pitrou19690592009-06-12 20:14:08 +0000200class PyMockFileIO(MockFileIO, pyio.BytesIO):
201 pass
202
203
204class MockNonBlockWriterIO:
205
206 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000207 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000208 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000209
Antoine Pitrou19690592009-06-12 20:14:08 +0000210 def pop_written(self):
211 s = b"".join(self._write_stack)
212 self._write_stack[:] = []
213 return s
214
215 def block_on(self, char):
216 """Block when a given char is encountered."""
217 self._blocker_char = char
218
219 def readable(self):
220 return True
221
222 def seekable(self):
223 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000224
225 def writable(self):
226 return True
227
Antoine Pitrou19690592009-06-12 20:14:08 +0000228 def write(self, b):
229 b = bytes(b)
230 n = -1
231 if self._blocker_char:
232 try:
233 n = b.index(self._blocker_char)
234 except ValueError:
235 pass
236 else:
Antoine Pitrou5aa7df32011-11-21 20:16:44 +0100237 if n > 0:
238 # write data up to the first blocker
239 self._write_stack.append(b[:n])
240 return n
241 else:
242 # cancel blocker and indicate would block
243 self._blocker_char = None
244 return None
Antoine Pitrou19690592009-06-12 20:14:08 +0000245 self._write_stack.append(b)
246 return len(b)
247
248class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
249 BlockingIOError = io.BlockingIOError
250
251class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
252 BlockingIOError = pyio.BlockingIOError
253
Christian Heimes1a6387e2008-03-26 12:49:49 +0000254
255class IOTest(unittest.TestCase):
256
Antoine Pitrou19690592009-06-12 20:14:08 +0000257 def setUp(self):
258 support.unlink(support.TESTFN)
259
Christian Heimes1a6387e2008-03-26 12:49:49 +0000260 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000261 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000262
263 def write_ops(self, f):
264 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000265 f.truncate(0)
266 self.assertEqual(f.tell(), 5)
267 f.seek(0)
268
269 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000270 self.assertEqual(f.seek(0), 0)
271 self.assertEqual(f.write(b"Hello."), 6)
272 self.assertEqual(f.tell(), 6)
273 self.assertEqual(f.seek(-1, 1), 5)
274 self.assertEqual(f.tell(), 5)
275 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
276 self.assertEqual(f.seek(0), 0)
277 self.assertEqual(f.write(b"h"), 1)
278 self.assertEqual(f.seek(-1, 2), 13)
279 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000280
Christian Heimes1a6387e2008-03-26 12:49:49 +0000281 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000282 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000283 self.assertRaises(TypeError, f.seek, 0.0)
284
285 def read_ops(self, f, buffered=False):
286 data = f.read(5)
287 self.assertEqual(data, b"hello")
288 data = bytearray(data)
289 self.assertEqual(f.readinto(data), 5)
290 self.assertEqual(data, b" worl")
291 self.assertEqual(f.readinto(data), 2)
292 self.assertEqual(len(data), 5)
293 self.assertEqual(data[:2], b"d\n")
294 self.assertEqual(f.seek(0), 0)
295 self.assertEqual(f.read(20), b"hello world\n")
296 self.assertEqual(f.read(1), b"")
297 self.assertEqual(f.readinto(bytearray(b"x")), 0)
298 self.assertEqual(f.seek(-6, 2), 6)
299 self.assertEqual(f.read(5), b"world")
300 self.assertEqual(f.read(0), b"")
301 self.assertEqual(f.readinto(bytearray()), 0)
302 self.assertEqual(f.seek(-6, 1), 5)
303 self.assertEqual(f.read(5), b" worl")
304 self.assertEqual(f.tell(), 10)
305 self.assertRaises(TypeError, f.seek, 0.0)
306 if buffered:
307 f.seek(0)
308 self.assertEqual(f.read(), b"hello world\n")
309 f.seek(6)
310 self.assertEqual(f.read(), b"world\n")
311 self.assertEqual(f.read(), b"")
312
313 LARGE = 2**31
314
315 def large_file_ops(self, f):
316 assert f.readable()
317 assert f.writable()
318 self.assertEqual(f.seek(self.LARGE), self.LARGE)
319 self.assertEqual(f.tell(), self.LARGE)
320 self.assertEqual(f.write(b"xxx"), 3)
321 self.assertEqual(f.tell(), self.LARGE + 3)
322 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
323 self.assertEqual(f.truncate(), self.LARGE + 2)
324 self.assertEqual(f.tell(), self.LARGE + 2)
325 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
326 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000327 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000328 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
329 self.assertEqual(f.seek(-1, 2), self.LARGE)
330 self.assertEqual(f.read(2), b"x")
331
Antoine Pitrou19690592009-06-12 20:14:08 +0000332 def test_invalid_operations(self):
333 # Try writing on a file opened in read mode and vice-versa.
334 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000335 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000336 self.assertRaises(IOError, fp.read)
337 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000338 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000339 self.assertRaises(IOError, fp.write, b"blah")
340 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000341 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000342 self.assertRaises(IOError, fp.write, "blah")
343 self.assertRaises(IOError, fp.writelines, ["blah\n"])
344
Christian Heimes1a6387e2008-03-26 12:49:49 +0000345 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000346 with self.open(support.TESTFN, "wb", buffering=0) as f:
347 self.assertEqual(f.readable(), False)
348 self.assertEqual(f.writable(), True)
349 self.assertEqual(f.seekable(), True)
350 self.write_ops(f)
351 with self.open(support.TESTFN, "rb", buffering=0) as f:
352 self.assertEqual(f.readable(), True)
353 self.assertEqual(f.writable(), False)
354 self.assertEqual(f.seekable(), True)
355 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000356
357 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000358 with self.open(support.TESTFN, "wb") as f:
359 self.assertEqual(f.readable(), False)
360 self.assertEqual(f.writable(), True)
361 self.assertEqual(f.seekable(), True)
362 self.write_ops(f)
363 with self.open(support.TESTFN, "rb") as f:
364 self.assertEqual(f.readable(), True)
365 self.assertEqual(f.writable(), False)
366 self.assertEqual(f.seekable(), True)
367 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000368
369 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000370 with self.open(support.TESTFN, "wb") as f:
371 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
372 with self.open(support.TESTFN, "rb") as f:
373 self.assertEqual(f.readline(), b"abc\n")
374 self.assertEqual(f.readline(10), b"def\n")
375 self.assertEqual(f.readline(2), b"xy")
376 self.assertEqual(f.readline(4), b"zzy\n")
377 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000378 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000379 self.assertRaises(TypeError, f.readline, 5.3)
380 with self.open(support.TESTFN, "r") as f:
381 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000382
383 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000384 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000385 self.write_ops(f)
386 data = f.getvalue()
387 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000388 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000389 self.read_ops(f, True)
390
391 def test_large_file_ops(self):
392 # On Windows and Mac OSX this test comsumes large resources; It takes
393 # a long time to build the >2GB file and takes >2GB of disk space
394 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000395 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware1f702212013-12-10 14:09:20 -0600396 support.requires(
397 'largefile',
398 'test requires %s bytes and a long time to run' % self.LARGE)
Antoine Pitrou19690592009-06-12 20:14:08 +0000399 with self.open(support.TESTFN, "w+b", 0) as f:
400 self.large_file_ops(f)
401 with self.open(support.TESTFN, "w+b") as f:
402 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000403
404 def test_with_open(self):
405 for bufsize in (0, 1, 100):
406 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000407 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000408 f.write(b"xxx")
409 self.assertEqual(f.closed, True)
410 f = None
411 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000412 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000413 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000414 except ZeroDivisionError:
415 self.assertEqual(f.closed, True)
416 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000417 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000418
Antoine Pitroue741cc62009-01-21 00:45:36 +0000419 # issue 5008
420 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000421 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000422 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000423 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000424 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000425 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000426 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000427 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000428 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000429
Christian Heimes1a6387e2008-03-26 12:49:49 +0000430 def test_destructor(self):
431 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000432 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000433 def __del__(self):
434 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000435 try:
436 f = super(MyFileIO, self).__del__
437 except AttributeError:
438 pass
439 else:
440 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000441 def close(self):
442 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000443 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000444 def flush(self):
445 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000446 super(MyFileIO, self).flush()
447 f = MyFileIO(support.TESTFN, "wb")
448 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000449 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000450 support.gc_collect()
451 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000452 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000453 self.assertEqual(f.read(), b"xxx")
454
455 def _check_base_destructor(self, base):
456 record = []
457 class MyIO(base):
458 def __init__(self):
459 # This exercises the availability of attributes on object
460 # destruction.
461 # (in the C version, close() is called by the tp_dealloc
462 # function, not by __del__)
463 self.on_del = 1
464 self.on_close = 2
465 self.on_flush = 3
466 def __del__(self):
467 record.append(self.on_del)
468 try:
469 f = super(MyIO, self).__del__
470 except AttributeError:
471 pass
472 else:
473 f()
474 def close(self):
475 record.append(self.on_close)
476 super(MyIO, self).close()
477 def flush(self):
478 record.append(self.on_flush)
479 super(MyIO, self).flush()
480 f = MyIO()
481 del f
482 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000483 self.assertEqual(record, [1, 2, 3])
484
Antoine Pitrou19690592009-06-12 20:14:08 +0000485 def test_IOBase_destructor(self):
486 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000487
Antoine Pitrou19690592009-06-12 20:14:08 +0000488 def test_RawIOBase_destructor(self):
489 self._check_base_destructor(self.RawIOBase)
490
491 def test_BufferedIOBase_destructor(self):
492 self._check_base_destructor(self.BufferedIOBase)
493
494 def test_TextIOBase_destructor(self):
495 self._check_base_destructor(self.TextIOBase)
496
497 def test_close_flushes(self):
498 with self.open(support.TESTFN, "wb") as f:
499 f.write(b"xxx")
500 with self.open(support.TESTFN, "rb") as f:
501 self.assertEqual(f.read(), b"xxx")
502
503 def test_array_writes(self):
504 a = array.array(b'i', range(10))
505 n = len(a.tostring())
506 with self.open(support.TESTFN, "wb", 0) as f:
507 self.assertEqual(f.write(a), n)
508 with self.open(support.TESTFN, "wb") as f:
509 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000510
511 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000512 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000513 closefd=False)
514
Antoine Pitrou19690592009-06-12 20:14:08 +0000515 def test_read_closed(self):
516 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000517 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000518 with self.open(support.TESTFN, "r") as f:
519 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000520 self.assertEqual(file.read(), "egg\n")
521 file.seek(0)
522 file.close()
523 self.assertRaises(ValueError, file.read)
524
525 def test_no_closefd_with_filename(self):
526 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000527 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000528
529 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000530 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000531 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000532 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000533 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000534 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000535 self.assertEqual(file.buffer.raw.closefd, False)
536
Antoine Pitrou19690592009-06-12 20:14:08 +0000537 def test_garbage_collection(self):
538 # FileIO objects are collected, and collecting them flushes
539 # all data to disk.
540 f = self.FileIO(support.TESTFN, "wb")
541 f.write(b"abcxxx")
542 f.f = f
543 wr = weakref.ref(f)
544 del f
545 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000546 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000547 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000548 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000549
Antoine Pitrou19690592009-06-12 20:14:08 +0000550 def test_unbounded_file(self):
551 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
552 zero = "/dev/zero"
553 if not os.path.exists(zero):
554 self.skipTest("{0} does not exist".format(zero))
555 if sys.maxsize > 0x7FFFFFFF:
556 self.skipTest("test can only run in a 32-bit address space")
557 if support.real_max_memuse < support._2G:
558 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000559 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000560 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000561 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000562 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000563 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000564 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000565
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000566 def test_flush_error_on_close(self):
567 f = self.open(support.TESTFN, "wb", buffering=0)
568 def bad_flush():
569 raise IOError()
570 f.flush = bad_flush
571 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600572 self.assertTrue(f.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000573
574 def test_multi_close(self):
575 f = self.open(support.TESTFN, "wb", buffering=0)
576 f.close()
577 f.close()
578 f.close()
579 self.assertRaises(ValueError, f.flush)
580
Antoine Pitrou6391b342010-09-14 18:48:19 +0000581 def test_RawIOBase_read(self):
582 # Exercise the default RawIOBase.read() implementation (which calls
583 # readinto() internally).
584 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
585 self.assertEqual(rawio.read(2), b"ab")
586 self.assertEqual(rawio.read(2), b"c")
587 self.assertEqual(rawio.read(2), b"d")
588 self.assertEqual(rawio.read(2), None)
589 self.assertEqual(rawio.read(2), b"ef")
590 self.assertEqual(rawio.read(2), b"g")
591 self.assertEqual(rawio.read(2), None)
592 self.assertEqual(rawio.read(2), b"")
593
Hynek Schlawack877effc2012-05-25 09:24:18 +0200594 def test_fileio_closefd(self):
595 # Issue #4841
596 with self.open(__file__, 'rb') as f1, \
597 self.open(__file__, 'rb') as f2:
598 fileio = self.FileIO(f1.fileno(), closefd=False)
599 # .__init__() must not close f1
600 fileio.__init__(f2.fileno(), closefd=False)
601 f1.readline()
602 # .close() must not close f2
603 fileio.close()
604 f2.readline()
605
606
Antoine Pitrou19690592009-06-12 20:14:08 +0000607class CIOTest(IOTest):
Antoine Pitrou16166452011-07-12 22:04:20 +0200608
609 def test_IOBase_finalize(self):
610 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
611 # class which inherits IOBase and an object of this class are caught
612 # in a reference cycle and close() is already in the method cache.
613 class MyIO(self.IOBase):
614 def close(self):
615 pass
616
617 # create an instance to populate the method cache
618 MyIO()
619 obj = MyIO()
620 obj.obj = obj
621 wr = weakref.ref(obj)
622 del MyIO
623 del obj
624 support.gc_collect()
625 self.assertTrue(wr() is None, wr)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000626
Antoine Pitrou19690592009-06-12 20:14:08 +0000627class PyIOTest(IOTest):
628 test_array_writes = unittest.skip(
629 "len(array.array) returns number of elements rather than bytelength"
630 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000631
632
Antoine Pitrou19690592009-06-12 20:14:08 +0000633class CommonBufferedTests:
634 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
635
636 def test_detach(self):
637 raw = self.MockRawIO()
638 buf = self.tp(raw)
639 self.assertIs(buf.detach(), raw)
640 self.assertRaises(ValueError, buf.detach)
641
642 def test_fileno(self):
643 rawio = self.MockRawIO()
644 bufio = self.tp(rawio)
645
Ezio Melotti2623a372010-11-21 13:34:58 +0000646 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000647
Zachary Ware1f702212013-12-10 14:09:20 -0600648 @unittest.skip('test having existential crisis')
Antoine Pitrou19690592009-06-12 20:14:08 +0000649 def test_no_fileno(self):
650 # XXX will we always have fileno() function? If so, kill
651 # this test. Else, write it.
652 pass
653
654 def test_invalid_args(self):
655 rawio = self.MockRawIO()
656 bufio = self.tp(rawio)
657 # Invalid whence
658 self.assertRaises(ValueError, bufio.seek, 0, -1)
659 self.assertRaises(ValueError, bufio.seek, 0, 3)
660
661 def test_override_destructor(self):
662 tp = self.tp
663 record = []
664 class MyBufferedIO(tp):
665 def __del__(self):
666 record.append(1)
667 try:
668 f = super(MyBufferedIO, self).__del__
669 except AttributeError:
670 pass
671 else:
672 f()
673 def close(self):
674 record.append(2)
675 super(MyBufferedIO, self).close()
676 def flush(self):
677 record.append(3)
678 super(MyBufferedIO, self).flush()
679 rawio = self.MockRawIO()
680 bufio = MyBufferedIO(rawio)
681 writable = bufio.writable()
682 del bufio
683 support.gc_collect()
684 if writable:
685 self.assertEqual(record, [1, 2, 3])
686 else:
687 self.assertEqual(record, [1, 2])
688
689 def test_context_manager(self):
690 # Test usability as a context manager
691 rawio = self.MockRawIO()
692 bufio = self.tp(rawio)
693 def _with():
694 with bufio:
695 pass
696 _with()
697 # bufio should now be closed, and using it a second time should raise
698 # a ValueError.
699 self.assertRaises(ValueError, _with)
700
701 def test_error_through_destructor(self):
702 # Test that the exception state is not modified by a destructor,
703 # even if close() fails.
704 rawio = self.CloseFailureIO()
705 def f():
706 self.tp(rawio).xyzzy
707 with support.captured_output("stderr") as s:
708 self.assertRaises(AttributeError, f)
709 s = s.getvalue().strip()
710 if s:
711 # The destructor *may* have printed an unraisable error, check it
712 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000713 self.assertTrue(s.startswith("Exception IOError: "), s)
714 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000715
716 def test_repr(self):
717 raw = self.MockRawIO()
718 b = self.tp(raw)
719 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
720 self.assertEqual(repr(b), "<%s>" % clsname)
721 raw.name = "dummy"
722 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
723 raw.name = b"dummy"
724 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000725
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000726 def test_flush_error_on_close(self):
727 raw = self.MockRawIO()
728 def bad_flush():
729 raise IOError()
730 raw.flush = bad_flush
731 b = self.tp(raw)
732 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -0600733 self.assertTrue(b.closed)
734
735 def test_close_error_on_close(self):
736 raw = self.MockRawIO()
737 def bad_flush():
738 raise IOError('flush')
739 def bad_close():
740 raise IOError('close')
741 raw.close = bad_close
742 b = self.tp(raw)
743 b.flush = bad_flush
744 with self.assertRaises(IOError) as err: # exception not swallowed
745 b.close()
746 self.assertEqual(err.exception.args, ('close',))
747 self.assertFalse(b.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000748
749 def test_multi_close(self):
750 raw = self.MockRawIO()
751 b = self.tp(raw)
752 b.close()
753 b.close()
754 b.close()
755 self.assertRaises(ValueError, b.flush)
756
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000757 def test_readonly_attributes(self):
758 raw = self.MockRawIO()
759 buf = self.tp(raw)
760 x = self.MockRawIO()
761 with self.assertRaises((AttributeError, TypeError)):
762 buf.raw = x
763
Christian Heimes1a6387e2008-03-26 12:49:49 +0000764
Antoine Pitroubff5df02012-07-29 19:02:46 +0200765class SizeofTest:
766
767 @support.cpython_only
768 def test_sizeof(self):
769 bufsize1 = 4096
770 bufsize2 = 8192
771 rawio = self.MockRawIO()
772 bufio = self.tp(rawio, buffer_size=bufsize1)
773 size = sys.getsizeof(bufio) - bufsize1
774 rawio = self.MockRawIO()
775 bufio = self.tp(rawio, buffer_size=bufsize2)
776 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
777
778
Antoine Pitrou19690592009-06-12 20:14:08 +0000779class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
780 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000781
Antoine Pitrou19690592009-06-12 20:14:08 +0000782 def test_constructor(self):
783 rawio = self.MockRawIO([b"abc"])
784 bufio = self.tp(rawio)
785 bufio.__init__(rawio)
786 bufio.__init__(rawio, buffer_size=1024)
787 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000788 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000789 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
790 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
791 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
792 rawio = self.MockRawIO([b"abc"])
793 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000794 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000795
Antoine Pitrou19690592009-06-12 20:14:08 +0000796 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000797 for arg in (None, 7):
798 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
799 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000800 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000801 # Invalid args
802 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000803
Antoine Pitrou19690592009-06-12 20:14:08 +0000804 def test_read1(self):
805 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
806 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000807 self.assertEqual(b"a", bufio.read(1))
808 self.assertEqual(b"b", bufio.read1(1))
809 self.assertEqual(rawio._reads, 1)
810 self.assertEqual(b"c", bufio.read1(100))
811 self.assertEqual(rawio._reads, 1)
812 self.assertEqual(b"d", bufio.read1(100))
813 self.assertEqual(rawio._reads, 2)
814 self.assertEqual(b"efg", bufio.read1(100))
815 self.assertEqual(rawio._reads, 3)
816 self.assertEqual(b"", bufio.read1(100))
817 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000818 # Invalid args
819 self.assertRaises(ValueError, bufio.read1, -1)
820
821 def test_readinto(self):
822 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
823 bufio = self.tp(rawio)
824 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000825 self.assertEqual(bufio.readinto(b), 2)
826 self.assertEqual(b, b"ab")
827 self.assertEqual(bufio.readinto(b), 2)
828 self.assertEqual(b, b"cd")
829 self.assertEqual(bufio.readinto(b), 2)
830 self.assertEqual(b, b"ef")
831 self.assertEqual(bufio.readinto(b), 1)
832 self.assertEqual(b, b"gf")
833 self.assertEqual(bufio.readinto(b), 0)
834 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000835
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000836 def test_readlines(self):
837 def bufio():
838 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
839 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000840 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
841 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
842 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000843
Antoine Pitrou19690592009-06-12 20:14:08 +0000844 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000845 data = b"abcdefghi"
846 dlen = len(data)
847
848 tests = [
849 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
850 [ 100, [ 3, 3, 3], [ dlen ] ],
851 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
852 ]
853
854 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000855 rawio = self.MockFileIO(data)
856 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000857 pos = 0
858 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000859 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000860 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000861 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000862 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000863
Antoine Pitrou19690592009-06-12 20:14:08 +0000864 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000865 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000866 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
867 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000868 self.assertEqual(b"abcd", bufio.read(6))
869 self.assertEqual(b"e", bufio.read(1))
870 self.assertEqual(b"fg", bufio.read())
871 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200872 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000873 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000874
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200875 rawio = self.MockRawIO((b"a", None, None))
876 self.assertEqual(b"a", rawio.readall())
877 self.assertIsNone(rawio.readall())
878
Antoine Pitrou19690592009-06-12 20:14:08 +0000879 def test_read_past_eof(self):
880 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
881 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000882
Ezio Melotti2623a372010-11-21 13:34:58 +0000883 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000884
Antoine Pitrou19690592009-06-12 20:14:08 +0000885 def test_read_all(self):
886 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
887 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000888
Ezio Melotti2623a372010-11-21 13:34:58 +0000889 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000890
Victor Stinner6a102812010-04-27 23:55:59 +0000891 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000892 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000893 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000894 try:
895 # Write out many bytes with exactly the same number of 0's,
896 # 1's... 255's. This will help us check that concurrent reading
897 # doesn't duplicate or forget contents.
898 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000899 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000900 random.shuffle(l)
901 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000902 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000903 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000904 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000905 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000906 errors = []
907 results = []
908 def f():
909 try:
910 # Intra-buffer read then buffer-flushing read
911 for n in cycle([1, 19]):
912 s = bufio.read(n)
913 if not s:
914 break
915 # list.append() is atomic
916 results.append(s)
917 except Exception as e:
918 errors.append(e)
919 raise
920 threads = [threading.Thread(target=f) for x in range(20)]
921 for t in threads:
922 t.start()
923 time.sleep(0.02) # yield
924 for t in threads:
925 t.join()
926 self.assertFalse(errors,
927 "the following exceptions were caught: %r" % errors)
928 s = b''.join(results)
929 for i in range(256):
930 c = bytes(bytearray([i]))
931 self.assertEqual(s.count(c), N)
932 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000933 support.unlink(support.TESTFN)
934
935 def test_misbehaved_io(self):
936 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
937 bufio = self.tp(rawio)
938 self.assertRaises(IOError, bufio.seek, 0)
939 self.assertRaises(IOError, bufio.tell)
940
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000941 def test_no_extraneous_read(self):
942 # Issue #9550; when the raw IO object has satisfied the read request,
943 # we should not issue any additional reads, otherwise it may block
944 # (e.g. socket).
945 bufsize = 16
946 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
947 rawio = self.MockRawIO([b"x" * n])
948 bufio = self.tp(rawio, bufsize)
949 self.assertEqual(bufio.read(n), b"x" * n)
950 # Simple case: one raw read is enough to satisfy the request.
951 self.assertEqual(rawio._extraneous_reads, 0,
952 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
953 # A more complex case where two raw reads are needed to satisfy
954 # the request.
955 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
956 bufio = self.tp(rawio, bufsize)
957 self.assertEqual(bufio.read(n), b"x" * n)
958 self.assertEqual(rawio._extraneous_reads, 0,
959 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
960
961
Antoine Pitroubff5df02012-07-29 19:02:46 +0200962class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +0000963 tp = io.BufferedReader
964
965 def test_constructor(self):
966 BufferedReaderTest.test_constructor(self)
967 # The allocation can succeed on 32-bit builds, e.g. with more
968 # than 2GB RAM and a 64-bit kernel.
969 if sys.maxsize > 0x7FFFFFFF:
970 rawio = self.MockRawIO()
971 bufio = self.tp(rawio)
972 self.assertRaises((OverflowError, MemoryError, ValueError),
973 bufio.__init__, rawio, sys.maxsize)
974
975 def test_initialization(self):
976 rawio = self.MockRawIO([b"abc"])
977 bufio = self.tp(rawio)
978 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
979 self.assertRaises(ValueError, bufio.read)
980 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
981 self.assertRaises(ValueError, bufio.read)
982 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
983 self.assertRaises(ValueError, bufio.read)
984
985 def test_misbehaved_io_read(self):
986 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
987 bufio = self.tp(rawio)
988 # _pyio.BufferedReader seems to implement reading different, so that
989 # checking this is not so easy.
990 self.assertRaises(IOError, bufio.read, 10)
991
992 def test_garbage_collection(self):
993 # C BufferedReader objects are collected.
994 # The Python version has __del__, so it ends into gc.garbage instead
995 rawio = self.FileIO(support.TESTFN, "w+b")
996 f = self.tp(rawio)
997 f.f = f
998 wr = weakref.ref(f)
999 del f
1000 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001001 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00001002
R David Murray5b2cf5e2013-02-23 22:11:21 -05001003 def test_args_error(self):
1004 # Issue #17275
1005 with self.assertRaisesRegexp(TypeError, "BufferedReader"):
1006 self.tp(io.BytesIO(), 1024, 1024, 1024)
1007
1008
Antoine Pitrou19690592009-06-12 20:14:08 +00001009class PyBufferedReaderTest(BufferedReaderTest):
1010 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001011
1012
Antoine Pitrou19690592009-06-12 20:14:08 +00001013class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1014 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001015
Antoine Pitrou19690592009-06-12 20:14:08 +00001016 def test_constructor(self):
1017 rawio = self.MockRawIO()
1018 bufio = self.tp(rawio)
1019 bufio.__init__(rawio)
1020 bufio.__init__(rawio, buffer_size=1024)
1021 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +00001022 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001023 bufio.flush()
1024 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1025 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1026 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1027 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +00001028 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +00001029 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001030 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001031
Antoine Pitrou19690592009-06-12 20:14:08 +00001032 def test_detach_flush(self):
1033 raw = self.MockRawIO()
1034 buf = self.tp(raw)
1035 buf.write(b"howdy!")
1036 self.assertFalse(raw._write_stack)
1037 buf.detach()
1038 self.assertEqual(raw._write_stack, [b"howdy!"])
1039
1040 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001041 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +00001042 writer = self.MockRawIO()
1043 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001044 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001045 self.assertFalse(writer._write_stack)
1046
Antoine Pitrou19690592009-06-12 20:14:08 +00001047 def test_write_overflow(self):
1048 writer = self.MockRawIO()
1049 bufio = self.tp(writer, 8)
1050 contents = b"abcdefghijklmnop"
1051 for n in range(0, len(contents), 3):
1052 bufio.write(contents[n:n+3])
1053 flushed = b"".join(writer._write_stack)
1054 # At least (total - 8) bytes were implicitly flushed, perhaps more
1055 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001056 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001057
Antoine Pitrou19690592009-06-12 20:14:08 +00001058 def check_writes(self, intermediate_func):
1059 # Lots of writes, test the flushed output is as expected.
1060 contents = bytes(range(256)) * 1000
1061 n = 0
1062 writer = self.MockRawIO()
1063 bufio = self.tp(writer, 13)
1064 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1065 def gen_sizes():
1066 for size in count(1):
1067 for i in range(15):
1068 yield size
1069 sizes = gen_sizes()
1070 while n < len(contents):
1071 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +00001072 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001073 intermediate_func(bufio)
1074 n += size
1075 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001076 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001077 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001078
Antoine Pitrou19690592009-06-12 20:14:08 +00001079 def test_writes(self):
1080 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001081
Antoine Pitrou19690592009-06-12 20:14:08 +00001082 def test_writes_and_flushes(self):
1083 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001084
Antoine Pitrou19690592009-06-12 20:14:08 +00001085 def test_writes_and_seeks(self):
1086 def _seekabs(bufio):
1087 pos = bufio.tell()
1088 bufio.seek(pos + 1, 0)
1089 bufio.seek(pos - 1, 0)
1090 bufio.seek(pos, 0)
1091 self.check_writes(_seekabs)
1092 def _seekrel(bufio):
1093 pos = bufio.seek(0, 1)
1094 bufio.seek(+1, 1)
1095 bufio.seek(-1, 1)
1096 bufio.seek(pos, 0)
1097 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001098
Antoine Pitrou19690592009-06-12 20:14:08 +00001099 def test_writes_and_truncates(self):
1100 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001101
Antoine Pitrou19690592009-06-12 20:14:08 +00001102 def test_write_non_blocking(self):
1103 raw = self.MockNonBlockWriterIO()
1104 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001105
Ezio Melotti2623a372010-11-21 13:34:58 +00001106 self.assertEqual(bufio.write(b"abcd"), 4)
1107 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001108 # 1 byte will be written, the rest will be buffered
1109 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001110 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001111
Antoine Pitrou19690592009-06-12 20:14:08 +00001112 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1113 raw.block_on(b"0")
1114 try:
1115 bufio.write(b"opqrwxyz0123456789")
1116 except self.BlockingIOError as e:
1117 written = e.characters_written
1118 else:
1119 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001120 self.assertEqual(written, 16)
1121 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001122 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001123
Ezio Melotti2623a372010-11-21 13:34:58 +00001124 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001125 s = raw.pop_written()
1126 # Previously buffered bytes were flushed
1127 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001128
Antoine Pitrou19690592009-06-12 20:14:08 +00001129 def test_write_and_rewind(self):
1130 raw = io.BytesIO()
1131 bufio = self.tp(raw, 4)
1132 self.assertEqual(bufio.write(b"abcdef"), 6)
1133 self.assertEqual(bufio.tell(), 6)
1134 bufio.seek(0, 0)
1135 self.assertEqual(bufio.write(b"XY"), 2)
1136 bufio.seek(6, 0)
1137 self.assertEqual(raw.getvalue(), b"XYcdef")
1138 self.assertEqual(bufio.write(b"123456"), 6)
1139 bufio.flush()
1140 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001141
Antoine Pitrou19690592009-06-12 20:14:08 +00001142 def test_flush(self):
1143 writer = self.MockRawIO()
1144 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001145 bufio.write(b"abc")
1146 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001147 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001148
Antoine Pitrou78e761e2012-10-16 22:57:11 +02001149 def test_writelines(self):
1150 l = [b'ab', b'cd', b'ef']
1151 writer = self.MockRawIO()
1152 bufio = self.tp(writer, 8)
1153 bufio.writelines(l)
1154 bufio.flush()
1155 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1156
1157 def test_writelines_userlist(self):
1158 l = UserList([b'ab', b'cd', b'ef'])
1159 writer = self.MockRawIO()
1160 bufio = self.tp(writer, 8)
1161 bufio.writelines(l)
1162 bufio.flush()
1163 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1164
1165 def test_writelines_error(self):
1166 writer = self.MockRawIO()
1167 bufio = self.tp(writer, 8)
1168 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1169 self.assertRaises(TypeError, bufio.writelines, None)
1170
Antoine Pitrou19690592009-06-12 20:14:08 +00001171 def test_destructor(self):
1172 writer = self.MockRawIO()
1173 bufio = self.tp(writer, 8)
1174 bufio.write(b"abc")
1175 del bufio
1176 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001177 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001178
1179 def test_truncate(self):
1180 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001181 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001182 bufio = self.tp(raw, 8)
1183 bufio.write(b"abcdef")
1184 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001185 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001186 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001187 self.assertEqual(f.read(), b"abc")
1188
Victor Stinner6a102812010-04-27 23:55:59 +00001189 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001190 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001191 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001192 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001193 # Write out many bytes from many threads and test they were
1194 # all flushed.
1195 N = 1000
1196 contents = bytes(range(256)) * N
1197 sizes = cycle([1, 19])
1198 n = 0
1199 queue = deque()
1200 while n < len(contents):
1201 size = next(sizes)
1202 queue.append(contents[n:n+size])
1203 n += size
1204 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001205 # We use a real file object because it allows us to
1206 # exercise situations where the GIL is released before
1207 # writing the buffer to the raw streams. This is in addition
1208 # to concurrency issues due to switching threads in the middle
1209 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001210 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001211 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001212 errors = []
1213 def f():
1214 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001215 while True:
1216 try:
1217 s = queue.popleft()
1218 except IndexError:
1219 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001220 bufio.write(s)
1221 except Exception as e:
1222 errors.append(e)
1223 raise
1224 threads = [threading.Thread(target=f) for x in range(20)]
1225 for t in threads:
1226 t.start()
1227 time.sleep(0.02) # yield
1228 for t in threads:
1229 t.join()
1230 self.assertFalse(errors,
1231 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001232 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001233 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001234 s = f.read()
1235 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001236 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001237 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001238 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001239
Antoine Pitrou19690592009-06-12 20:14:08 +00001240 def test_misbehaved_io(self):
1241 rawio = self.MisbehavedRawIO()
1242 bufio = self.tp(rawio, 5)
1243 self.assertRaises(IOError, bufio.seek, 0)
1244 self.assertRaises(IOError, bufio.tell)
1245 self.assertRaises(IOError, bufio.write, b"abcdef")
1246
1247 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001248 with support.check_warnings(("max_buffer_size is deprecated",
1249 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001250 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001251
Benjamin Petersona2d6d712012-12-20 12:24:10 -06001252 def test_write_error_on_close(self):
1253 raw = self.MockRawIO()
1254 def bad_write(b):
1255 raise IOError()
1256 raw.write = bad_write
1257 b = self.tp(raw)
1258 b.write(b'spam')
1259 self.assertRaises(IOError, b.close) # exception not swallowed
1260 self.assertTrue(b.closed)
1261
Antoine Pitrou19690592009-06-12 20:14:08 +00001262
Antoine Pitroubff5df02012-07-29 19:02:46 +02001263class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001264 tp = io.BufferedWriter
1265
1266 def test_constructor(self):
1267 BufferedWriterTest.test_constructor(self)
1268 # The allocation can succeed on 32-bit builds, e.g. with more
1269 # than 2GB RAM and a 64-bit kernel.
1270 if sys.maxsize > 0x7FFFFFFF:
1271 rawio = self.MockRawIO()
1272 bufio = self.tp(rawio)
1273 self.assertRaises((OverflowError, MemoryError, ValueError),
1274 bufio.__init__, rawio, sys.maxsize)
1275
1276 def test_initialization(self):
1277 rawio = self.MockRawIO()
1278 bufio = self.tp(rawio)
1279 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1280 self.assertRaises(ValueError, bufio.write, b"def")
1281 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1282 self.assertRaises(ValueError, bufio.write, b"def")
1283 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1284 self.assertRaises(ValueError, bufio.write, b"def")
1285
1286 def test_garbage_collection(self):
1287 # C BufferedWriter objects are collected, and collecting them flushes
1288 # all data to disk.
1289 # The Python version has __del__, so it ends into gc.garbage instead
1290 rawio = self.FileIO(support.TESTFN, "w+b")
1291 f = self.tp(rawio)
1292 f.write(b"123xxx")
1293 f.x = f
1294 wr = weakref.ref(f)
1295 del f
1296 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001297 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001298 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001299 self.assertEqual(f.read(), b"123xxx")
1300
R David Murray5b2cf5e2013-02-23 22:11:21 -05001301 def test_args_error(self):
1302 # Issue #17275
1303 with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
1304 self.tp(io.BytesIO(), 1024, 1024, 1024)
1305
Antoine Pitrou19690592009-06-12 20:14:08 +00001306
1307class PyBufferedWriterTest(BufferedWriterTest):
1308 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001309
1310class BufferedRWPairTest(unittest.TestCase):
1311
Antoine Pitrou19690592009-06-12 20:14:08 +00001312 def test_constructor(self):
1313 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001314 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001315
Antoine Pitrou19690592009-06-12 20:14:08 +00001316 def test_detach(self):
1317 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1318 self.assertRaises(self.UnsupportedOperation, pair.detach)
1319
1320 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001321 with support.check_warnings(("max_buffer_size is deprecated",
1322 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001323 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001324
1325 def test_constructor_with_not_readable(self):
1326 class NotReadable(MockRawIO):
1327 def readable(self):
1328 return False
1329
1330 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1331
1332 def test_constructor_with_not_writeable(self):
1333 class NotWriteable(MockRawIO):
1334 def writable(self):
1335 return False
1336
1337 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1338
1339 def test_read(self):
1340 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1341
1342 self.assertEqual(pair.read(3), b"abc")
1343 self.assertEqual(pair.read(1), b"d")
1344 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001345 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1346 self.assertEqual(pair.read(None), b"abc")
1347
1348 def test_readlines(self):
1349 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1350 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1351 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1352 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001353
1354 def test_read1(self):
1355 # .read1() is delegated to the underlying reader object, so this test
1356 # can be shallow.
1357 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1358
1359 self.assertEqual(pair.read1(3), b"abc")
1360
1361 def test_readinto(self):
1362 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1363
1364 data = bytearray(5)
1365 self.assertEqual(pair.readinto(data), 5)
1366 self.assertEqual(data, b"abcde")
1367
1368 def test_write(self):
1369 w = self.MockRawIO()
1370 pair = self.tp(self.MockRawIO(), w)
1371
1372 pair.write(b"abc")
1373 pair.flush()
1374 pair.write(b"def")
1375 pair.flush()
1376 self.assertEqual(w._write_stack, [b"abc", b"def"])
1377
1378 def test_peek(self):
1379 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1380
1381 self.assertTrue(pair.peek(3).startswith(b"abc"))
1382 self.assertEqual(pair.read(3), b"abc")
1383
1384 def test_readable(self):
1385 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1386 self.assertTrue(pair.readable())
1387
1388 def test_writeable(self):
1389 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1390 self.assertTrue(pair.writable())
1391
1392 def test_seekable(self):
1393 # BufferedRWPairs are never seekable, even if their readers and writers
1394 # are.
1395 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1396 self.assertFalse(pair.seekable())
1397
1398 # .flush() is delegated to the underlying writer object and has been
1399 # tested in the test_write method.
1400
1401 def test_close_and_closed(self):
1402 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1403 self.assertFalse(pair.closed)
1404 pair.close()
1405 self.assertTrue(pair.closed)
1406
1407 def test_isatty(self):
1408 class SelectableIsAtty(MockRawIO):
1409 def __init__(self, isatty):
1410 MockRawIO.__init__(self)
1411 self._isatty = isatty
1412
1413 def isatty(self):
1414 return self._isatty
1415
1416 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1417 self.assertFalse(pair.isatty())
1418
1419 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1420 self.assertTrue(pair.isatty())
1421
1422 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1423 self.assertTrue(pair.isatty())
1424
1425 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1426 self.assertTrue(pair.isatty())
1427
1428class CBufferedRWPairTest(BufferedRWPairTest):
1429 tp = io.BufferedRWPair
1430
1431class PyBufferedRWPairTest(BufferedRWPairTest):
1432 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001433
1434
Antoine Pitrou19690592009-06-12 20:14:08 +00001435class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1436 read_mode = "rb+"
1437 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001438
Antoine Pitrou19690592009-06-12 20:14:08 +00001439 def test_constructor(self):
1440 BufferedReaderTest.test_constructor(self)
1441 BufferedWriterTest.test_constructor(self)
1442
1443 def test_read_and_write(self):
1444 raw = self.MockRawIO((b"asdf", b"ghjk"))
1445 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001446
1447 self.assertEqual(b"as", rw.read(2))
1448 rw.write(b"ddd")
1449 rw.write(b"eee")
1450 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001451 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001452 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001453
Antoine Pitrou19690592009-06-12 20:14:08 +00001454 def test_seek_and_tell(self):
1455 raw = self.BytesIO(b"asdfghjkl")
1456 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001457
Ezio Melotti2623a372010-11-21 13:34:58 +00001458 self.assertEqual(b"as", rw.read(2))
1459 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001460 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001461 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001462
Antoine Pitrou808cec52011-08-20 15:40:58 +02001463 rw.write(b"123f")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001464 rw.seek(0, 0)
Antoine Pitrou808cec52011-08-20 15:40:58 +02001465 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001466 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001467 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001468 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001469 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001470 self.assertEqual(7, rw.tell())
1471 self.assertEqual(b"fl", rw.read(11))
Antoine Pitrou808cec52011-08-20 15:40:58 +02001472 rw.flush()
1473 self.assertEqual(b"asdf123fl", raw.getvalue())
1474
Christian Heimes1a6387e2008-03-26 12:49:49 +00001475 self.assertRaises(TypeError, rw.seek, 0.0)
1476
Antoine Pitrou19690592009-06-12 20:14:08 +00001477 def check_flush_and_read(self, read_func):
1478 raw = self.BytesIO(b"abcdefghi")
1479 bufio = self.tp(raw)
1480
Ezio Melotti2623a372010-11-21 13:34:58 +00001481 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001482 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001483 self.assertEqual(b"ef", read_func(bufio, 2))
1484 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001485 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001486 self.assertEqual(6, bufio.tell())
1487 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001488 raw.seek(0, 0)
1489 raw.write(b"XYZ")
1490 # flush() resets the read buffer
1491 bufio.flush()
1492 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001493 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001494
1495 def test_flush_and_read(self):
1496 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1497
1498 def test_flush_and_readinto(self):
1499 def _readinto(bufio, n=-1):
1500 b = bytearray(n if n >= 0 else 9999)
1501 n = bufio.readinto(b)
1502 return bytes(b[:n])
1503 self.check_flush_and_read(_readinto)
1504
1505 def test_flush_and_peek(self):
1506 def _peek(bufio, n=-1):
1507 # This relies on the fact that the buffer can contain the whole
1508 # raw stream, otherwise peek() can return less.
1509 b = bufio.peek(n)
1510 if n != -1:
1511 b = b[:n]
1512 bufio.seek(len(b), 1)
1513 return b
1514 self.check_flush_and_read(_peek)
1515
1516 def test_flush_and_write(self):
1517 raw = self.BytesIO(b"abcdefghi")
1518 bufio = self.tp(raw)
1519
1520 bufio.write(b"123")
1521 bufio.flush()
1522 bufio.write(b"45")
1523 bufio.flush()
1524 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001525 self.assertEqual(b"12345fghi", raw.getvalue())
1526 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001527
1528 def test_threads(self):
1529 BufferedReaderTest.test_threads(self)
1530 BufferedWriterTest.test_threads(self)
1531
1532 def test_writes_and_peek(self):
1533 def _peek(bufio):
1534 bufio.peek(1)
1535 self.check_writes(_peek)
1536 def _peek(bufio):
1537 pos = bufio.tell()
1538 bufio.seek(-1, 1)
1539 bufio.peek(1)
1540 bufio.seek(pos, 0)
1541 self.check_writes(_peek)
1542
1543 def test_writes_and_reads(self):
1544 def _read(bufio):
1545 bufio.seek(-1, 1)
1546 bufio.read(1)
1547 self.check_writes(_read)
1548
1549 def test_writes_and_read1s(self):
1550 def _read1(bufio):
1551 bufio.seek(-1, 1)
1552 bufio.read1(1)
1553 self.check_writes(_read1)
1554
1555 def test_writes_and_readintos(self):
1556 def _read(bufio):
1557 bufio.seek(-1, 1)
1558 bufio.readinto(bytearray(1))
1559 self.check_writes(_read)
1560
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001561 def test_write_after_readahead(self):
1562 # Issue #6629: writing after the buffer was filled by readahead should
1563 # first rewind the raw stream.
1564 for overwrite_size in [1, 5]:
1565 raw = self.BytesIO(b"A" * 10)
1566 bufio = self.tp(raw, 4)
1567 # Trigger readahead
1568 self.assertEqual(bufio.read(1), b"A")
1569 self.assertEqual(bufio.tell(), 1)
1570 # Overwriting should rewind the raw stream if it needs so
1571 bufio.write(b"B" * overwrite_size)
1572 self.assertEqual(bufio.tell(), overwrite_size + 1)
1573 # If the write size was smaller than the buffer size, flush() and
1574 # check that rewind happens.
1575 bufio.flush()
1576 self.assertEqual(bufio.tell(), overwrite_size + 1)
1577 s = raw.getvalue()
1578 self.assertEqual(s,
1579 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1580
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001581 def test_write_rewind_write(self):
1582 # Various combinations of reading / writing / seeking backwards / writing again
1583 def mutate(bufio, pos1, pos2):
1584 assert pos2 >= pos1
1585 # Fill the buffer
1586 bufio.seek(pos1)
1587 bufio.read(pos2 - pos1)
1588 bufio.write(b'\x02')
1589 # This writes earlier than the previous write, but still inside
1590 # the buffer.
1591 bufio.seek(pos1)
1592 bufio.write(b'\x01')
1593
1594 b = b"\x80\x81\x82\x83\x84"
1595 for i in range(0, len(b)):
1596 for j in range(i, len(b)):
1597 raw = self.BytesIO(b)
1598 bufio = self.tp(raw, 100)
1599 mutate(bufio, i, j)
1600 bufio.flush()
1601 expected = bytearray(b)
1602 expected[j] = 2
1603 expected[i] = 1
1604 self.assertEqual(raw.getvalue(), expected,
1605 "failed result for i=%d, j=%d" % (i, j))
1606
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001607 def test_truncate_after_read_or_write(self):
1608 raw = self.BytesIO(b"A" * 10)
1609 bufio = self.tp(raw, 100)
1610 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1611 self.assertEqual(bufio.truncate(), 2)
1612 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1613 self.assertEqual(bufio.truncate(), 4)
1614
Antoine Pitrou19690592009-06-12 20:14:08 +00001615 def test_misbehaved_io(self):
1616 BufferedReaderTest.test_misbehaved_io(self)
1617 BufferedWriterTest.test_misbehaved_io(self)
1618
Antoine Pitrou808cec52011-08-20 15:40:58 +02001619 def test_interleaved_read_write(self):
1620 # Test for issue #12213
1621 with self.BytesIO(b'abcdefgh') as raw:
1622 with self.tp(raw, 100) as f:
1623 f.write(b"1")
1624 self.assertEqual(f.read(1), b'b')
1625 f.write(b'2')
1626 self.assertEqual(f.read1(1), b'd')
1627 f.write(b'3')
1628 buf = bytearray(1)
1629 f.readinto(buf)
1630 self.assertEqual(buf, b'f')
1631 f.write(b'4')
1632 self.assertEqual(f.peek(1), b'h')
1633 f.flush()
1634 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1635
1636 with self.BytesIO(b'abc') as raw:
1637 with self.tp(raw, 100) as f:
1638 self.assertEqual(f.read(1), b'a')
1639 f.write(b"2")
1640 self.assertEqual(f.read(1), b'c')
1641 f.flush()
1642 self.assertEqual(raw.getvalue(), b'a2c')
1643
1644 def test_interleaved_readline_write(self):
1645 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1646 with self.tp(raw) as f:
1647 f.write(b'1')
1648 self.assertEqual(f.readline(), b'b\n')
1649 f.write(b'2')
1650 self.assertEqual(f.readline(), b'def\n')
1651 f.write(b'3')
1652 self.assertEqual(f.readline(), b'\n')
1653 f.flush()
1654 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1655
R David Murray5b2cf5e2013-02-23 22:11:21 -05001656
Antoine Pitroubff5df02012-07-29 19:02:46 +02001657class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1658 BufferedRandomTest, SizeofTest):
Antoine Pitrou19690592009-06-12 20:14:08 +00001659 tp = io.BufferedRandom
1660
1661 def test_constructor(self):
1662 BufferedRandomTest.test_constructor(self)
1663 # The allocation can succeed on 32-bit builds, e.g. with more
1664 # than 2GB RAM and a 64-bit kernel.
1665 if sys.maxsize > 0x7FFFFFFF:
1666 rawio = self.MockRawIO()
1667 bufio = self.tp(rawio)
1668 self.assertRaises((OverflowError, MemoryError, ValueError),
1669 bufio.__init__, rawio, sys.maxsize)
1670
1671 def test_garbage_collection(self):
1672 CBufferedReaderTest.test_garbage_collection(self)
1673 CBufferedWriterTest.test_garbage_collection(self)
1674
R David Murray5b2cf5e2013-02-23 22:11:21 -05001675 def test_args_error(self):
1676 # Issue #17275
1677 with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
1678 self.tp(io.BytesIO(), 1024, 1024, 1024)
1679
1680
Antoine Pitrou19690592009-06-12 20:14:08 +00001681class PyBufferedRandomTest(BufferedRandomTest):
1682 tp = pyio.BufferedRandom
1683
1684
Christian Heimes1a6387e2008-03-26 12:49:49 +00001685# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1686# properties:
1687# - A single output character can correspond to many bytes of input.
1688# - The number of input bytes to complete the character can be
1689# undetermined until the last input byte is received.
1690# - The number of input bytes can vary depending on previous input.
1691# - A single input byte can correspond to many characters of output.
1692# - The number of output characters can be undetermined until the
1693# last input byte is received.
1694# - The number of output characters can vary depending on previous input.
1695
1696class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1697 """
1698 For testing seek/tell behavior with a stateful, buffering decoder.
1699
1700 Input is a sequence of words. Words may be fixed-length (length set
1701 by input) or variable-length (period-terminated). In variable-length
1702 mode, extra periods are ignored. Possible words are:
1703 - 'i' followed by a number sets the input length, I (maximum 99).
1704 When I is set to 0, words are space-terminated.
1705 - 'o' followed by a number sets the output length, O (maximum 99).
1706 - Any other word is converted into a word followed by a period on
1707 the output. The output word consists of the input word truncated
1708 or padded out with hyphens to make its length equal to O. If O
1709 is 0, the word is output verbatim without truncating or padding.
1710 I and O are initially set to 1. When I changes, any buffered input is
1711 re-scanned according to the new I. EOF also terminates the last word.
1712 """
1713
1714 def __init__(self, errors='strict'):
1715 codecs.IncrementalDecoder.__init__(self, errors)
1716 self.reset()
1717
1718 def __repr__(self):
1719 return '<SID %x>' % id(self)
1720
1721 def reset(self):
1722 self.i = 1
1723 self.o = 1
1724 self.buffer = bytearray()
1725
1726 def getstate(self):
1727 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1728 return bytes(self.buffer), i*100 + o
1729
1730 def setstate(self, state):
1731 buffer, io = state
1732 self.buffer = bytearray(buffer)
1733 i, o = divmod(io, 100)
1734 self.i, self.o = i ^ 1, o ^ 1
1735
1736 def decode(self, input, final=False):
1737 output = ''
1738 for b in input:
1739 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001740 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001741 if self.buffer:
1742 output += self.process_word()
1743 else:
1744 self.buffer.append(b)
1745 else: # fixed-length, terminate after self.i bytes
1746 self.buffer.append(b)
1747 if len(self.buffer) == self.i:
1748 output += self.process_word()
1749 if final and self.buffer: # EOF terminates the last word
1750 output += self.process_word()
1751 return output
1752
1753 def process_word(self):
1754 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001755 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001756 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001757 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001758 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1759 else:
1760 output = self.buffer.decode('ascii')
1761 if len(output) < self.o:
1762 output += '-'*self.o # pad out with hyphens
1763 if self.o:
1764 output = output[:self.o] # truncate to output length
1765 output += '.'
1766 self.buffer = bytearray()
1767 return output
1768
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001769 codecEnabled = False
1770
1771 @classmethod
1772 def lookupTestDecoder(cls, name):
1773 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001774 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001775 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001776 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001777 incrementalencoder=None,
1778 streamreader=None, streamwriter=None,
1779 incrementaldecoder=cls)
1780
1781# Register the previous decoder for testing.
1782# Disabled by default, tests will enable it.
1783codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1784
1785
Christian Heimes1a6387e2008-03-26 12:49:49 +00001786class StatefulIncrementalDecoderTest(unittest.TestCase):
1787 """
1788 Make sure the StatefulIncrementalDecoder actually works.
1789 """
1790
1791 test_cases = [
1792 # I=1, O=1 (fixed-length input == fixed-length output)
1793 (b'abcd', False, 'a.b.c.d.'),
1794 # I=0, O=0 (variable-length input, variable-length output)
1795 (b'oiabcd', True, 'abcd.'),
1796 # I=0, O=0 (should ignore extra periods)
1797 (b'oi...abcd...', True, 'abcd.'),
1798 # I=0, O=6 (variable-length input, fixed-length output)
1799 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1800 # I=2, O=6 (fixed-length input < fixed-length output)
1801 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1802 # I=6, O=3 (fixed-length input > fixed-length output)
1803 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1804 # I=0, then 3; O=29, then 15 (with longer output)
1805 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1806 'a----------------------------.' +
1807 'b----------------------------.' +
1808 'cde--------------------------.' +
1809 'abcdefghijabcde.' +
1810 'a.b------------.' +
1811 '.c.------------.' +
1812 'd.e------------.' +
1813 'k--------------.' +
1814 'l--------------.' +
1815 'm--------------.')
1816 ]
1817
Antoine Pitrou19690592009-06-12 20:14:08 +00001818 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001819 # Try a few one-shot test cases.
1820 for input, eof, output in self.test_cases:
1821 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001822 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001823
1824 # Also test an unfinished decode, followed by forcing EOF.
1825 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001826 self.assertEqual(d.decode(b'oiabcd'), '')
1827 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001828
1829class TextIOWrapperTest(unittest.TestCase):
1830
1831 def setUp(self):
1832 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1833 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001834 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001835
1836 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001837 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001838
Antoine Pitrou19690592009-06-12 20:14:08 +00001839 def test_constructor(self):
1840 r = self.BytesIO(b"\xc3\xa9\n\n")
1841 b = self.BufferedReader(r, 1000)
1842 t = self.TextIOWrapper(b)
1843 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001844 self.assertEqual(t.encoding, "latin1")
1845 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001846 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001847 self.assertEqual(t.encoding, "utf8")
1848 self.assertEqual(t.line_buffering, True)
1849 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001850 self.assertRaises(TypeError, t.__init__, b, newline=42)
1851 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1852
1853 def test_detach(self):
1854 r = self.BytesIO()
1855 b = self.BufferedWriter(r)
1856 t = self.TextIOWrapper(b)
1857 self.assertIs(t.detach(), b)
1858
1859 t = self.TextIOWrapper(b, encoding="ascii")
1860 t.write("howdy")
1861 self.assertFalse(r.getvalue())
1862 t.detach()
1863 self.assertEqual(r.getvalue(), b"howdy")
1864 self.assertRaises(ValueError, t.detach)
1865
1866 def test_repr(self):
1867 raw = self.BytesIO("hello".encode("utf-8"))
1868 b = self.BufferedReader(raw)
1869 t = self.TextIOWrapper(b, encoding="utf-8")
1870 modname = self.TextIOWrapper.__module__
1871 self.assertEqual(repr(t),
1872 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1873 raw.name = "dummy"
1874 self.assertEqual(repr(t),
1875 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1876 raw.name = b"dummy"
1877 self.assertEqual(repr(t),
1878 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1879
1880 def test_line_buffering(self):
1881 r = self.BytesIO()
1882 b = self.BufferedWriter(r, 1000)
1883 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1884 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00001885 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001886 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00001887 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001888 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00001889 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001890
Antoine Pitrou19690592009-06-12 20:14:08 +00001891 def test_encoding(self):
1892 # Check the encoding attribute is always set, and valid
1893 b = self.BytesIO()
1894 t = self.TextIOWrapper(b, encoding="utf8")
1895 self.assertEqual(t.encoding, "utf8")
1896 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001897 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001898 codecs.lookup(t.encoding)
1899
1900 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001901 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001902 b = self.BytesIO(b"abc\n\xff\n")
1903 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001904 self.assertRaises(UnicodeError, t.read)
1905 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001906 b = self.BytesIO(b"abc\n\xff\n")
1907 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001908 self.assertRaises(UnicodeError, t.read)
1909 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001910 b = self.BytesIO(b"abc\n\xff\n")
1911 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00001912 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001913 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001914 b = self.BytesIO(b"abc\n\xff\n")
1915 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00001916 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001917
Antoine Pitrou19690592009-06-12 20:14:08 +00001918 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001919 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001920 b = self.BytesIO()
1921 t = self.TextIOWrapper(b, encoding="ascii")
1922 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001923 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001924 b = self.BytesIO()
1925 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1926 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001927 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001928 b = self.BytesIO()
1929 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001930 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001931 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001932 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001933 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001934 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001935 b = self.BytesIO()
1936 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001937 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001938 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001939 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001940 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001941
Antoine Pitrou19690592009-06-12 20:14:08 +00001942 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001943 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1944
1945 tests = [
1946 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1947 [ '', input_lines ],
1948 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1949 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1950 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1951 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001952 encodings = (
1953 'utf-8', 'latin-1',
1954 'utf-16', 'utf-16-le', 'utf-16-be',
1955 'utf-32', 'utf-32-le', 'utf-32-be',
1956 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001957
1958 # Try a range of buffer sizes to test the case where \r is the last
1959 # character in TextIOWrapper._pending_line.
1960 for encoding in encodings:
1961 # XXX: str.encode() should return bytes
1962 data = bytes(''.join(input_lines).encode(encoding))
1963 for do_reads in (False, True):
1964 for bufsize in range(1, 10):
1965 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001966 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1967 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001968 encoding=encoding)
1969 if do_reads:
1970 got_lines = []
1971 while True:
1972 c2 = textio.read(2)
1973 if c2 == '':
1974 break
Ezio Melotti2623a372010-11-21 13:34:58 +00001975 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001976 got_lines.append(c2 + textio.readline())
1977 else:
1978 got_lines = list(textio)
1979
1980 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00001981 self.assertEqual(got_line, exp_line)
1982 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001983
Antoine Pitrou19690592009-06-12 20:14:08 +00001984 def test_newlines_input(self):
1985 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001986 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1987 for newline, expected in [
1988 (None, normalized.decode("ascii").splitlines(True)),
1989 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001990 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1991 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1992 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001993 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001994 buf = self.BytesIO(testdata)
1995 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00001996 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001997 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001998 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001999
Antoine Pitrou19690592009-06-12 20:14:08 +00002000 def test_newlines_output(self):
2001 testdict = {
2002 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2003 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2004 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2005 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2006 }
2007 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2008 for newline, expected in tests:
2009 buf = self.BytesIO()
2010 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2011 txt.write("AAA\nB")
2012 txt.write("BB\nCCC\n")
2013 txt.write("X\rY\r\nZ")
2014 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00002015 self.assertEqual(buf.closed, False)
2016 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00002017
2018 def test_destructor(self):
2019 l = []
2020 base = self.BytesIO
2021 class MyBytesIO(base):
2022 def close(self):
2023 l.append(self.getvalue())
2024 base.close(self)
2025 b = MyBytesIO()
2026 t = self.TextIOWrapper(b, encoding="ascii")
2027 t.write("abc")
2028 del t
2029 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00002030 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00002031
2032 def test_override_destructor(self):
2033 record = []
2034 class MyTextIO(self.TextIOWrapper):
2035 def __del__(self):
2036 record.append(1)
2037 try:
2038 f = super(MyTextIO, self).__del__
2039 except AttributeError:
2040 pass
2041 else:
2042 f()
2043 def close(self):
2044 record.append(2)
2045 super(MyTextIO, self).close()
2046 def flush(self):
2047 record.append(3)
2048 super(MyTextIO, self).flush()
2049 b = self.BytesIO()
2050 t = MyTextIO(b, encoding="ascii")
2051 del t
2052 support.gc_collect()
2053 self.assertEqual(record, [1, 2, 3])
2054
2055 def test_error_through_destructor(self):
2056 # Test that the exception state is not modified by a destructor,
2057 # even if close() fails.
2058 rawio = self.CloseFailureIO()
2059 def f():
2060 self.TextIOWrapper(rawio).xyzzy
2061 with support.captured_output("stderr") as s:
2062 self.assertRaises(AttributeError, f)
2063 s = s.getvalue().strip()
2064 if s:
2065 # The destructor *may* have printed an unraisable error, check it
2066 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002067 self.assertTrue(s.startswith("Exception IOError: "), s)
2068 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002069
2070 # Systematic tests of the text I/O API
2071
Antoine Pitrou19690592009-06-12 20:14:08 +00002072 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002073 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2074 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00002075 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002076 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002077 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002078 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002079 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002080 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00002081 self.assertEqual(f.tell(), 0)
2082 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002083 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002084 self.assertEqual(f.seek(0), 0)
2085 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002086 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002087 self.assertEqual(f.read(2), "ab")
2088 self.assertEqual(f.read(1), "c")
2089 self.assertEqual(f.read(1), "")
2090 self.assertEqual(f.read(), "")
2091 self.assertEqual(f.tell(), cookie)
2092 self.assertEqual(f.seek(0), 0)
2093 self.assertEqual(f.seek(0, 2), cookie)
2094 self.assertEqual(f.write("def"), 3)
2095 self.assertEqual(f.seek(cookie), cookie)
2096 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002097 if enc.startswith("utf"):
2098 self.multi_line_test(f, enc)
2099 f.close()
2100
2101 def multi_line_test(self, f, enc):
2102 f.seek(0)
2103 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00002104 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002105 wlines = []
2106 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2107 chars = []
2108 for i in range(size):
2109 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00002110 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002111 wlines.append((f.tell(), line))
2112 f.write(line)
2113 f.seek(0)
2114 rlines = []
2115 while True:
2116 pos = f.tell()
2117 line = f.readline()
2118 if not line:
2119 break
2120 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00002121 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002122
Antoine Pitrou19690592009-06-12 20:14:08 +00002123 def test_telling(self):
2124 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002125 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002126 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002127 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00002128 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002129 p2 = f.tell()
2130 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002131 self.assertEqual(f.tell(), p0)
2132 self.assertEqual(f.readline(), "\xff\n")
2133 self.assertEqual(f.tell(), p1)
2134 self.assertEqual(f.readline(), "\xff\n")
2135 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002136 f.seek(0)
2137 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002138 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002139 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00002140 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002141 f.close()
2142
Antoine Pitrou19690592009-06-12 20:14:08 +00002143 def test_seeking(self):
2144 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00002145 prefix_size = chunk_size - 2
2146 u_prefix = "a" * prefix_size
2147 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00002148 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002149 u_suffix = "\u8888\n"
2150 suffix = bytes(u_suffix.encode("utf-8"))
2151 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00002152 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002153 f.write(line*2)
2154 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002155 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002156 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00002157 self.assertEqual(s, prefix.decode("ascii"))
2158 self.assertEqual(f.tell(), prefix_size)
2159 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002160
Antoine Pitrou19690592009-06-12 20:14:08 +00002161 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002162 # Regression test for a specific bug
2163 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002164 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002165 f.write(data)
2166 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002167 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002168 f._CHUNK_SIZE # Just test that it exists
2169 f._CHUNK_SIZE = 2
2170 f.readline()
2171 f.tell()
2172
Antoine Pitrou19690592009-06-12 20:14:08 +00002173 def test_seek_and_tell(self):
2174 #Test seek/tell using the StatefulIncrementalDecoder.
2175 # Make test faster by doing smaller seeks
2176 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002177
Antoine Pitrou19690592009-06-12 20:14:08 +00002178 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002179 """Tell/seek to various points within a data stream and ensure
2180 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002181 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002182 f.write(data)
2183 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002184 f = self.open(support.TESTFN, encoding='test_decoder')
2185 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002186 decoded = f.read()
2187 f.close()
2188
2189 for i in range(min_pos, len(decoded) + 1): # seek positions
2190 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002191 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002192 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002193 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002194 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002195 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002196 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002197 f.close()
2198
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002199 # Enable the test decoder.
2200 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002201
2202 # Run the tests.
2203 try:
2204 # Try each test case.
2205 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002206 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002207
2208 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002209 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2210 offset = CHUNK_SIZE - len(input)//2
2211 prefix = b'.'*offset
2212 # Don't bother seeking into the prefix (takes too long).
2213 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002214 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002215
2216 # Ensure our test decoder won't interfere with subsequent tests.
2217 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002218 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002219
Antoine Pitrou19690592009-06-12 20:14:08 +00002220 def test_encoded_writes(self):
2221 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002222 tests = ("utf-16",
2223 "utf-16-le",
2224 "utf-16-be",
2225 "utf-32",
2226 "utf-32-le",
2227 "utf-32-be")
2228 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002229 buf = self.BytesIO()
2230 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002231 # Check if the BOM is written only once (see issue1753).
2232 f.write(data)
2233 f.write(data)
2234 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002235 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002236 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002237 self.assertEqual(f.read(), data * 2)
2238 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002239
Antoine Pitrou19690592009-06-12 20:14:08 +00002240 def test_unreadable(self):
2241 class UnReadable(self.BytesIO):
2242 def readable(self):
2243 return False
2244 txt = self.TextIOWrapper(UnReadable())
2245 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002246
Antoine Pitrou19690592009-06-12 20:14:08 +00002247 def test_read_one_by_one(self):
2248 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002249 reads = ""
2250 while True:
2251 c = txt.read(1)
2252 if not c:
2253 break
2254 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002255 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002256
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002257 def test_readlines(self):
2258 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2259 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2260 txt.seek(0)
2261 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2262 txt.seek(0)
2263 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2264
Christian Heimes1a6387e2008-03-26 12:49:49 +00002265 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002266 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002267 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002268 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002269 reads = ""
2270 while True:
2271 c = txt.read(128)
2272 if not c:
2273 break
2274 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002275 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002276
Antoine Pitroueadca1d2012-10-16 23:02:27 +02002277 def test_writelines(self):
2278 l = ['ab', 'cd', 'ef']
2279 buf = self.BytesIO()
2280 txt = self.TextIOWrapper(buf)
2281 txt.writelines(l)
2282 txt.flush()
2283 self.assertEqual(buf.getvalue(), b'abcdef')
2284
2285 def test_writelines_userlist(self):
2286 l = UserList(['ab', 'cd', 'ef'])
2287 buf = self.BytesIO()
2288 txt = self.TextIOWrapper(buf)
2289 txt.writelines(l)
2290 txt.flush()
2291 self.assertEqual(buf.getvalue(), b'abcdef')
2292
2293 def test_writelines_error(self):
2294 txt = self.TextIOWrapper(self.BytesIO())
2295 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2296 self.assertRaises(TypeError, txt.writelines, None)
2297 self.assertRaises(TypeError, txt.writelines, b'abc')
2298
Christian Heimes1a6387e2008-03-26 12:49:49 +00002299 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002300 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002301
2302 # read one char at a time
2303 reads = ""
2304 while True:
2305 c = txt.read(1)
2306 if not c:
2307 break
2308 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002309 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002310
2311 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002312 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002313 txt._CHUNK_SIZE = 4
2314
2315 reads = ""
2316 while True:
2317 c = txt.read(4)
2318 if not c:
2319 break
2320 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002321 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002322
2323 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002324 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002325 txt._CHUNK_SIZE = 4
2326
2327 reads = txt.read(4)
2328 reads += txt.read(4)
2329 reads += txt.readline()
2330 reads += txt.readline()
2331 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002332 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002333
2334 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002335 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002336 txt._CHUNK_SIZE = 4
2337
2338 reads = txt.read(4)
2339 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002340 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002341
2342 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002343 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002344 txt._CHUNK_SIZE = 4
2345
2346 reads = txt.read(4)
2347 pos = txt.tell()
2348 txt.seek(0)
2349 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002350 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002351
2352 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002353 buffer = self.BytesIO(self.testdata)
2354 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002355
2356 self.assertEqual(buffer.seekable(), txt.seekable())
2357
Antoine Pitrou19690592009-06-12 20:14:08 +00002358 def test_append_bom(self):
2359 # The BOM is not written again when appending to a non-empty file
2360 filename = support.TESTFN
2361 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2362 with self.open(filename, 'w', encoding=charset) as f:
2363 f.write('aaa')
2364 pos = f.tell()
2365 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002366 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002367
2368 with self.open(filename, 'a', encoding=charset) as f:
2369 f.write('xxx')
2370 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002371 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002372
Antoine Pitrou19690592009-06-12 20:14:08 +00002373 def test_seek_bom(self):
2374 # Same test, but when seeking manually
2375 filename = support.TESTFN
2376 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2377 with self.open(filename, 'w', encoding=charset) as f:
2378 f.write('aaa')
2379 pos = f.tell()
2380 with self.open(filename, 'r+', encoding=charset) as f:
2381 f.seek(pos)
2382 f.write('zzz')
2383 f.seek(0)
2384 f.write('bbb')
2385 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002386 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002387
2388 def test_errors_property(self):
2389 with self.open(support.TESTFN, "w") as f:
2390 self.assertEqual(f.errors, "strict")
2391 with self.open(support.TESTFN, "w", errors="replace") as f:
2392 self.assertEqual(f.errors, "replace")
2393
Victor Stinner6a102812010-04-27 23:55:59 +00002394 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002395 def test_threads_write(self):
2396 # Issue6750: concurrent writes could duplicate data
2397 event = threading.Event()
2398 with self.open(support.TESTFN, "w", buffering=1) as f:
2399 def run(n):
2400 text = "Thread%03d\n" % n
2401 event.wait()
2402 f.write(text)
2403 threads = [threading.Thread(target=lambda n=x: run(n))
2404 for x in range(20)]
2405 for t in threads:
2406 t.start()
2407 time.sleep(0.02)
2408 event.set()
2409 for t in threads:
2410 t.join()
2411 with self.open(support.TESTFN) as f:
2412 content = f.read()
2413 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002414 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002415
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002416 def test_flush_error_on_close(self):
2417 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2418 def bad_flush():
2419 raise IOError()
2420 txt.flush = bad_flush
2421 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Petersona2d6d712012-12-20 12:24:10 -06002422 self.assertTrue(txt.closed)
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002423
2424 def test_multi_close(self):
2425 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2426 txt.close()
2427 txt.close()
2428 txt.close()
2429 self.assertRaises(ValueError, txt.flush)
2430
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002431 def test_readonly_attributes(self):
2432 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2433 buf = self.BytesIO(self.testdata)
2434 with self.assertRaises((AttributeError, TypeError)):
2435 txt.buffer = buf
2436
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002437 def test_read_nonbytes(self):
2438 # Issue #17106
2439 # Crash when underlying read() returns non-bytes
2440 class NonbytesStream(self.StringIO):
2441 read1 = self.StringIO.read
2442 class NonbytesStream(self.StringIO):
2443 read1 = self.StringIO.read
2444 t = self.TextIOWrapper(NonbytesStream('a'))
2445 with self.maybeRaises(TypeError):
2446 t.read(1)
2447 t = self.TextIOWrapper(NonbytesStream('a'))
2448 with self.maybeRaises(TypeError):
2449 t.readline()
2450 t = self.TextIOWrapper(NonbytesStream('a'))
2451 self.assertEqual(t.read(), u'a')
2452
2453 def test_illegal_decoder(self):
2454 # Issue #17106
2455 # Crash when decoder returns non-string
2456 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2457 encoding='quopri_codec')
2458 with self.maybeRaises(TypeError):
2459 t.read(1)
2460 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2461 encoding='quopri_codec')
2462 with self.maybeRaises(TypeError):
2463 t.readline()
2464 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2465 encoding='quopri_codec')
2466 with self.maybeRaises(TypeError):
2467 t.read()
2468
2469
Antoine Pitrou19690592009-06-12 20:14:08 +00002470class CTextIOWrapperTest(TextIOWrapperTest):
2471
2472 def test_initialization(self):
2473 r = self.BytesIO(b"\xc3\xa9\n\n")
2474 b = self.BufferedReader(r, 1000)
2475 t = self.TextIOWrapper(b)
2476 self.assertRaises(TypeError, t.__init__, b, newline=42)
2477 self.assertRaises(ValueError, t.read)
2478 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2479 self.assertRaises(ValueError, t.read)
2480
2481 def test_garbage_collection(self):
2482 # C TextIOWrapper objects are collected, and collecting them flushes
2483 # all data to disk.
2484 # The Python version has __del__, so it ends in gc.garbage instead.
2485 rawio = io.FileIO(support.TESTFN, "wb")
2486 b = self.BufferedWriter(rawio)
2487 t = self.TextIOWrapper(b, encoding="ascii")
2488 t.write("456def")
2489 t.x = t
2490 wr = weakref.ref(t)
2491 del t
2492 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002493 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002494 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002495 self.assertEqual(f.read(), b"456def")
2496
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002497 def test_rwpair_cleared_before_textio(self):
2498 # Issue 13070: TextIOWrapper's finalization would crash when called
2499 # after the reference to the underlying BufferedRWPair's writer got
2500 # cleared by the GC.
2501 for i in range(1000):
2502 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2503 t1 = self.TextIOWrapper(b1, encoding="ascii")
2504 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2505 t2 = self.TextIOWrapper(b2, encoding="ascii")
2506 # circular references
2507 t1.buddy = t2
2508 t2.buddy = t1
2509 support.gc_collect()
2510
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002511 maybeRaises = unittest.TestCase.assertRaises
2512
Charles-François Natali9ffcbf72011-10-06 19:09:45 +02002513
Antoine Pitrou19690592009-06-12 20:14:08 +00002514class PyTextIOWrapperTest(TextIOWrapperTest):
Serhiy Storchaka354d50e2013-02-03 17:10:42 +02002515 @contextlib.contextmanager
2516 def maybeRaises(self, *args, **kwds):
2517 yield
Antoine Pitrou19690592009-06-12 20:14:08 +00002518
2519
2520class IncrementalNewlineDecoderTest(unittest.TestCase):
2521
2522 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002523 # UTF-8 specific tests for a newline decoder
2524 def _check_decode(b, s, **kwargs):
2525 # We exercise getstate() / setstate() as well as decode()
2526 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002527 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002528 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002529 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002530
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002531 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002532
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002533 _check_decode(b'\xe8', "")
2534 _check_decode(b'\xa2', "")
2535 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002536
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002537 _check_decode(b'\xe8', "")
2538 _check_decode(b'\xa2', "")
2539 _check_decode(b'\x88', "\u8888")
2540
2541 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002542 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2543
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002544 decoder.reset()
2545 _check_decode(b'\n', "\n")
2546 _check_decode(b'\r', "")
2547 _check_decode(b'', "\n", final=True)
2548 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002549
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002550 _check_decode(b'\r', "")
2551 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002552
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002553 _check_decode(b'\r\r\n', "\n\n")
2554 _check_decode(b'\r', "")
2555 _check_decode(b'\r', "\n")
2556 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002557
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002558 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2559 _check_decode(b'\xe8\xa2\x88', "\u8888")
2560 _check_decode(b'\n', "\n")
2561 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2562 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002563
Antoine Pitrou19690592009-06-12 20:14:08 +00002564 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002565 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002566 if encoding is not None:
2567 encoder = codecs.getincrementalencoder(encoding)()
2568 def _decode_bytewise(s):
2569 # Decode one byte at a time
2570 for b in encoder.encode(s):
2571 result.append(decoder.decode(b))
2572 else:
2573 encoder = None
2574 def _decode_bytewise(s):
2575 # Decode one char at a time
2576 for c in s:
2577 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002578 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002579 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002580 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002581 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002582 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002583 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002584 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002585 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002586 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002587 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002588 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002589 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002590 input = "abc"
2591 if encoder is not None:
2592 encoder.reset()
2593 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002594 self.assertEqual(decoder.decode(input), "abc")
2595 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002596
2597 def test_newline_decoder(self):
2598 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002599 # None meaning the IncrementalNewlineDecoder takes unicode input
2600 # rather than bytes input
2601 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002602 'utf-16', 'utf-16-le', 'utf-16-be',
2603 'utf-32', 'utf-32-le', 'utf-32-be',
2604 )
2605 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002606 decoder = enc and codecs.getincrementaldecoder(enc)()
2607 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2608 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002609 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002610 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2611 self.check_newline_decoding_utf8(decoder)
2612
2613 def test_newline_bytes(self):
2614 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2615 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002616 self.assertEqual(dec.newlines, None)
2617 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2618 self.assertEqual(dec.newlines, None)
2619 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2620 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002621 dec = self.IncrementalNewlineDecoder(None, translate=False)
2622 _check(dec)
2623 dec = self.IncrementalNewlineDecoder(None, translate=True)
2624 _check(dec)
2625
2626class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2627 pass
2628
2629class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2630 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002631
Christian Heimes1a6387e2008-03-26 12:49:49 +00002632
2633# XXX Tests for open()
2634
2635class MiscIOTest(unittest.TestCase):
2636
Benjamin Petersonad100c32008-11-20 22:06:22 +00002637 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002638 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002639
Antoine Pitrou19690592009-06-12 20:14:08 +00002640 def test___all__(self):
2641 for name in self.io.__all__:
2642 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002643 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002644 if name == "open":
2645 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002646 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002647 self.assertTrue(issubclass(obj, Exception), name)
2648 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002649 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002650
Benjamin Petersonad100c32008-11-20 22:06:22 +00002651 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002652 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002653 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002654 f.close()
2655
Antoine Pitrou19690592009-06-12 20:14:08 +00002656 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002657 self.assertEqual(f.name, support.TESTFN)
2658 self.assertEqual(f.buffer.name, support.TESTFN)
2659 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2660 self.assertEqual(f.mode, "U")
2661 self.assertEqual(f.buffer.mode, "rb")
2662 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002663 f.close()
2664
Antoine Pitrou19690592009-06-12 20:14:08 +00002665 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002666 self.assertEqual(f.mode, "w+")
2667 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2668 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002669
Antoine Pitrou19690592009-06-12 20:14:08 +00002670 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002671 self.assertEqual(g.mode, "wb")
2672 self.assertEqual(g.raw.mode, "wb")
2673 self.assertEqual(g.name, f.fileno())
2674 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002675 f.close()
2676 g.close()
2677
Antoine Pitrou19690592009-06-12 20:14:08 +00002678 def test_io_after_close(self):
2679 for kwargs in [
2680 {"mode": "w"},
2681 {"mode": "wb"},
2682 {"mode": "w", "buffering": 1},
2683 {"mode": "w", "buffering": 2},
2684 {"mode": "wb", "buffering": 0},
2685 {"mode": "r"},
2686 {"mode": "rb"},
2687 {"mode": "r", "buffering": 1},
2688 {"mode": "r", "buffering": 2},
2689 {"mode": "rb", "buffering": 0},
2690 {"mode": "w+"},
2691 {"mode": "w+b"},
2692 {"mode": "w+", "buffering": 1},
2693 {"mode": "w+", "buffering": 2},
2694 {"mode": "w+b", "buffering": 0},
2695 ]:
2696 f = self.open(support.TESTFN, **kwargs)
2697 f.close()
2698 self.assertRaises(ValueError, f.flush)
2699 self.assertRaises(ValueError, f.fileno)
2700 self.assertRaises(ValueError, f.isatty)
2701 self.assertRaises(ValueError, f.__iter__)
2702 if hasattr(f, "peek"):
2703 self.assertRaises(ValueError, f.peek, 1)
2704 self.assertRaises(ValueError, f.read)
2705 if hasattr(f, "read1"):
2706 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002707 if hasattr(f, "readall"):
2708 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002709 if hasattr(f, "readinto"):
2710 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2711 self.assertRaises(ValueError, f.readline)
2712 self.assertRaises(ValueError, f.readlines)
2713 self.assertRaises(ValueError, f.seek, 0)
2714 self.assertRaises(ValueError, f.tell)
2715 self.assertRaises(ValueError, f.truncate)
2716 self.assertRaises(ValueError, f.write,
2717 b"" if "b" in kwargs['mode'] else "")
2718 self.assertRaises(ValueError, f.writelines, [])
2719 self.assertRaises(ValueError, next, f)
2720
2721 def test_blockingioerror(self):
2722 # Various BlockingIOError issues
2723 self.assertRaises(TypeError, self.BlockingIOError)
2724 self.assertRaises(TypeError, self.BlockingIOError, 1)
2725 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2726 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2727 b = self.BlockingIOError(1, "")
2728 self.assertEqual(b.characters_written, 0)
2729 class C(unicode):
2730 pass
2731 c = C("")
2732 b = self.BlockingIOError(1, c)
2733 c.b = b
2734 b.c = c
2735 wr = weakref.ref(c)
2736 del c, b
2737 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002738 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002739
2740 def test_abcs(self):
2741 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002742 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2743 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2744 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2745 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002746
2747 def _check_abc_inheritance(self, abcmodule):
2748 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002749 self.assertIsInstance(f, abcmodule.IOBase)
2750 self.assertIsInstance(f, abcmodule.RawIOBase)
2751 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2752 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002753 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002754 self.assertIsInstance(f, abcmodule.IOBase)
2755 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2756 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2757 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002758 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002759 self.assertIsInstance(f, abcmodule.IOBase)
2760 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2761 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2762 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002763
2764 def test_abc_inheritance(self):
2765 # Test implementations inherit from their respective ABCs
2766 self._check_abc_inheritance(self)
2767
2768 def test_abc_inheritance_official(self):
2769 # Test implementations inherit from the official ABCs of the
2770 # baseline "io" module.
2771 self._check_abc_inheritance(io)
2772
Antoine Pitrou5aa7df32011-11-21 20:16:44 +01002773 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2774 def test_nonblock_pipe_write_bigbuf(self):
2775 self._test_nonblock_pipe_write(16*1024)
2776
2777 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2778 def test_nonblock_pipe_write_smallbuf(self):
2779 self._test_nonblock_pipe_write(1024)
2780
2781 def _set_non_blocking(self, fd):
2782 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2783 self.assertNotEqual(flags, -1)
2784 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2785 self.assertEqual(res, 0)
2786
2787 def _test_nonblock_pipe_write(self, bufsize):
2788 sent = []
2789 received = []
2790 r, w = os.pipe()
2791 self._set_non_blocking(r)
2792 self._set_non_blocking(w)
2793
2794 # To exercise all code paths in the C implementation we need
2795 # to play with buffer sizes. For instance, if we choose a
2796 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2797 # then we will never get a partial write of the buffer.
2798 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2799 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2800
2801 with rf, wf:
2802 for N in 9999, 73, 7574:
2803 try:
2804 i = 0
2805 while True:
2806 msg = bytes([i % 26 + 97]) * N
2807 sent.append(msg)
2808 wf.write(msg)
2809 i += 1
2810
2811 except self.BlockingIOError as e:
2812 self.assertEqual(e.args[0], errno.EAGAIN)
2813 sent[-1] = sent[-1][:e.characters_written]
2814 received.append(rf.read())
2815 msg = b'BLOCKED'
2816 wf.write(msg)
2817 sent.append(msg)
2818
2819 while True:
2820 try:
2821 wf.flush()
2822 break
2823 except self.BlockingIOError as e:
2824 self.assertEqual(e.args[0], errno.EAGAIN)
2825 self.assertEqual(e.characters_written, 0)
2826 received.append(rf.read())
2827
2828 received += iter(rf.read, None)
2829
2830 sent, received = b''.join(sent), b''.join(received)
2831 self.assertTrue(sent == received)
2832 self.assertTrue(wf.closed)
2833 self.assertTrue(rf.closed)
2834
Antoine Pitrou19690592009-06-12 20:14:08 +00002835class CMiscIOTest(MiscIOTest):
2836 io = io
2837
2838class PyMiscIOTest(MiscIOTest):
2839 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002840
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002841
2842@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2843class SignalsTest(unittest.TestCase):
2844
2845 def setUp(self):
2846 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2847
2848 def tearDown(self):
2849 signal.signal(signal.SIGALRM, self.oldalrm)
2850
2851 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00002852 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002853
2854 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02002855 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2856 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002857 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2858 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00002859 invokes the signal handler, and bubbles up the exception raised
2860 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002861 read_results = []
2862 def _read():
2863 s = os.read(r, 1)
2864 read_results.append(s)
2865 t = threading.Thread(target=_read)
2866 t.daemon = True
2867 r, w = os.pipe()
2868 try:
2869 wio = self.io.open(w, **fdopen_kwargs)
2870 t.start()
2871 signal.alarm(1)
2872 # Fill the pipe enough that the write will be blocking.
2873 # It will be interrupted by the timer armed above. Since the
2874 # other thread has read one byte, the low-level write will
2875 # return with a successful (partial) result rather than an EINTR.
2876 # The buffered IO layer must check for pending signal
2877 # handlers, which in this case will invoke alarm_interrupt().
2878 self.assertRaises(ZeroDivisionError,
Antoine Pitrou68915d72013-04-24 23:31:38 +02002879 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002880 t.join()
2881 # We got one byte, get another one and check that it isn't a
2882 # repeat of the first one.
2883 read_results.append(os.read(r, 1))
2884 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2885 finally:
2886 os.close(w)
2887 os.close(r)
2888 # This is deliberate. If we didn't close the file descriptor
2889 # before closing wio, wio would try to flush its internal
2890 # buffer, and block again.
2891 try:
2892 wio.close()
2893 except IOError as e:
2894 if e.errno != errno.EBADF:
2895 raise
2896
2897 def test_interrupted_write_unbuffered(self):
2898 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2899
2900 def test_interrupted_write_buffered(self):
2901 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2902
2903 def test_interrupted_write_text(self):
2904 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2905
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002906 def check_reentrant_write(self, data, **fdopen_kwargs):
2907 def on_alarm(*args):
2908 # Will be called reentrantly from the same thread
2909 wio.write(data)
Victor Stinner4c41f842011-07-05 13:29:26 +02002910 1//0
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002911 signal.signal(signal.SIGALRM, on_alarm)
2912 r, w = os.pipe()
2913 wio = self.io.open(w, **fdopen_kwargs)
2914 try:
2915 signal.alarm(1)
2916 # Either the reentrant call to wio.write() fails with RuntimeError,
2917 # or the signal handler raises ZeroDivisionError.
2918 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2919 while 1:
2920 for i in range(100):
2921 wio.write(data)
2922 wio.flush()
2923 # Make sure the buffer doesn't fill up and block further writes
2924 os.read(r, len(data) * 100)
2925 exc = cm.exception
2926 if isinstance(exc, RuntimeError):
2927 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2928 finally:
2929 wio.close()
2930 os.close(r)
2931
2932 def test_reentrant_write_buffered(self):
2933 self.check_reentrant_write(b"xy", mode="wb")
2934
2935 def test_reentrant_write_text(self):
2936 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2937
Antoine Pitrou6439c002011-02-25 21:35:47 +00002938 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2939 """Check that a buffered read, when it gets interrupted (either
2940 returning a partial result or EINTR), properly invokes the signal
2941 handler and retries if the latter returned successfully."""
2942 r, w = os.pipe()
2943 fdopen_kwargs["closefd"] = False
2944 def alarm_handler(sig, frame):
2945 os.write(w, b"bar")
2946 signal.signal(signal.SIGALRM, alarm_handler)
2947 try:
2948 rio = self.io.open(r, **fdopen_kwargs)
2949 os.write(w, b"foo")
2950 signal.alarm(1)
2951 # Expected behaviour:
2952 # - first raw read() returns partial b"foo"
2953 # - second raw read() returns EINTR
2954 # - third raw read() returns b"bar"
2955 self.assertEqual(decode(rio.read(6)), "foobar")
2956 finally:
2957 rio.close()
2958 os.close(w)
2959 os.close(r)
2960
2961 def test_interrupterd_read_retry_buffered(self):
2962 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2963 mode="rb")
2964
2965 def test_interrupterd_read_retry_text(self):
2966 self.check_interrupted_read_retry(lambda x: x,
2967 mode="r")
2968
2969 @unittest.skipUnless(threading, 'Threading required for this test.')
2970 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2971 """Check that a buffered write, when it gets interrupted (either
2972 returning a partial result or EINTR), properly invokes the signal
2973 handler and retries if the latter returned successfully."""
2974 select = support.import_module("select")
2975 # A quantity that exceeds the buffer size of an anonymous pipe's
2976 # write end.
Antoine Pitrou68915d72013-04-24 23:31:38 +02002977 N = support.PIPE_MAX_SIZE
Antoine Pitrou6439c002011-02-25 21:35:47 +00002978 r, w = os.pipe()
2979 fdopen_kwargs["closefd"] = False
2980 # We need a separate thread to read from the pipe and allow the
2981 # write() to finish. This thread is started after the SIGALRM is
2982 # received (forcing a first EINTR in write()).
2983 read_results = []
2984 write_finished = False
2985 def _read():
2986 while not write_finished:
2987 while r in select.select([r], [], [], 1.0)[0]:
2988 s = os.read(r, 1024)
2989 read_results.append(s)
2990 t = threading.Thread(target=_read)
2991 t.daemon = True
2992 def alarm1(sig, frame):
2993 signal.signal(signal.SIGALRM, alarm2)
2994 signal.alarm(1)
2995 def alarm2(sig, frame):
2996 t.start()
2997 signal.signal(signal.SIGALRM, alarm1)
2998 try:
2999 wio = self.io.open(w, **fdopen_kwargs)
3000 signal.alarm(1)
3001 # Expected behaviour:
3002 # - first raw write() is partial (because of the limited pipe buffer
3003 # and the first alarm)
3004 # - second raw write() returns EINTR (because of the second alarm)
3005 # - subsequent write()s are successful (either partial or complete)
3006 self.assertEqual(N, wio.write(item * N))
3007 wio.flush()
3008 write_finished = True
3009 t.join()
3010 self.assertEqual(N, sum(len(x) for x in read_results))
3011 finally:
3012 write_finished = True
3013 os.close(w)
3014 os.close(r)
3015 # This is deliberate. If we didn't close the file descriptor
3016 # before closing wio, wio would try to flush its internal
3017 # buffer, and could block (in case of failure).
3018 try:
3019 wio.close()
3020 except IOError as e:
3021 if e.errno != errno.EBADF:
3022 raise
3023
3024 def test_interrupterd_write_retry_buffered(self):
3025 self.check_interrupted_write_retry(b"x", mode="wb")
3026
3027 def test_interrupterd_write_retry_text(self):
3028 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3029
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003030
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003031class CSignalsTest(SignalsTest):
3032 io = io
3033
3034class PySignalsTest(SignalsTest):
3035 io = pyio
3036
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00003037 # Handling reentrancy issues would slow down _pyio even more, so the
3038 # tests are disabled.
3039 test_reentrant_write_buffered = None
3040 test_reentrant_write_text = None
3041
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003042
Christian Heimes1a6387e2008-03-26 12:49:49 +00003043def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00003044 tests = (CIOTest, PyIOTest,
3045 CBufferedReaderTest, PyBufferedReaderTest,
3046 CBufferedWriterTest, PyBufferedWriterTest,
3047 CBufferedRWPairTest, PyBufferedRWPairTest,
3048 CBufferedRandomTest, PyBufferedRandomTest,
3049 StatefulIncrementalDecoderTest,
3050 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3051 CTextIOWrapperTest, PyTextIOWrapperTest,
3052 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00003053 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00003054 )
3055
3056 # Put the namespaces of the IO module we are testing and some useful mock
3057 # classes in the __dict__ of each test.
3058 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00003059 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00003060 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3061 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3062 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3063 globs = globals()
3064 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3065 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3066 # Avoid turning open into a bound method.
3067 py_io_ns["open"] = pyio.OpenWrapper
3068 for test in tests:
3069 if test.__name__.startswith("C"):
3070 for name, obj in c_io_ns.items():
3071 setattr(test, name, obj)
3072 elif test.__name__.startswith("Py"):
3073 for name, obj in py_io_ns.items():
3074 setattr(test, name, obj)
3075
3076 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00003077
3078if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00003079 test_main()